recoverd: do_takeover_run() should mark when a takeover run is in progress
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         bool takeover_run_in_progress;
71         TALLOC_CTX *ip_check_disable_ctx;
72         struct ctdb_control_get_ifaces *ifaces;
73         TALLOC_CTX *deferred_rebalance_ctx;
74 };
75
76 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
77 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
78
79 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
80
81 /*
82   ban a node for a period of time
83  */
84 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
85 {
86         int ret;
87         struct ctdb_context *ctdb = rec->ctdb;
88         struct ctdb_ban_time bantime;
89        
90         if (!ctdb_validate_pnn(ctdb, pnn)) {
91                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
92                 return;
93         }
94
95         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
96
97         bantime.pnn  = pnn;
98         bantime.time = ban_time;
99
100         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
101         if (ret != 0) {
102                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
103                 return;
104         }
105
106 }
107
108 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
109
110
111 /*
112   remember the trouble maker
113  */
114 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
115 {
116         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
117         struct ctdb_banning_state *ban_state;
118
119         if (culprit > ctdb->num_nodes) {
120                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
121                 return;
122         }
123
124         /* If we are banned or stopped, do not set other nodes as culprits */
125         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
126                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
127                 return;
128         }
129
130         if (ctdb->nodes[culprit]->ban_state == NULL) {
131                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
132                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
133
134                 
135         }
136         ban_state = ctdb->nodes[culprit]->ban_state;
137         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
138                 /* this was the first time in a long while this node
139                    misbehaved so we will forgive any old transgressions.
140                 */
141                 ban_state->count = 0;
142         }
143
144         ban_state->count += count;
145         ban_state->last_reported_time = timeval_current();
146         rec->last_culprit_node = culprit;
147 }
148
149 /*
150   remember the trouble maker
151  */
152 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
153 {
154         ctdb_set_culprit_count(rec, culprit, 1);
155 }
156
157
158 /* this callback is called for every node that failed to execute the
159    recovered event
160 */
161 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
162 {
163         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
164
165         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
166
167         ctdb_set_culprit(rec, node_pnn);
168 }
169
170 /*
171   run the "recovered" eventscript on all nodes
172  */
173 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
174 {
175         TALLOC_CTX *tmp_ctx;
176         uint32_t *nodes;
177         struct ctdb_context *ctdb = rec->ctdb;
178
179         tmp_ctx = talloc_new(ctdb);
180         CTDB_NO_MEMORY(ctdb, tmp_ctx);
181
182         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
183         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
184                                         nodes, 0,
185                                         CONTROL_TIMEOUT(), false, tdb_null,
186                                         NULL, recovered_fail_callback,
187                                         rec) != 0) {
188                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
189
190                 talloc_free(tmp_ctx);
191                 return -1;
192         }
193
194         talloc_free(tmp_ctx);
195         return 0;
196 }
197
198 /* this callback is called for every node that failed to execute the
199    start recovery event
200 */
201 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
202 {
203         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
204
205         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
206
207         ctdb_set_culprit(rec, node_pnn);
208 }
209
210 /*
211   run the "startrecovery" eventscript on all nodes
212  */
213 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
214 {
215         TALLOC_CTX *tmp_ctx;
216         uint32_t *nodes;
217         struct ctdb_context *ctdb = rec->ctdb;
218
219         tmp_ctx = talloc_new(ctdb);
220         CTDB_NO_MEMORY(ctdb, tmp_ctx);
221
222         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
223         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
224                                         nodes, 0,
225                                         CONTROL_TIMEOUT(), false, tdb_null,
226                                         NULL,
227                                         startrecovery_fail_callback,
228                                         rec) != 0) {
229                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
230                 talloc_free(tmp_ctx);
231                 return -1;
232         }
233
234         talloc_free(tmp_ctx);
235         return 0;
236 }
237
238 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
239 {
240         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
241                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
242                 return;
243         }
244         if (node_pnn < ctdb->num_nodes) {
245                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
246         }
247
248         if (node_pnn == ctdb->pnn) {
249                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
250         }
251 }
252
253 /*
254   update the node capabilities for all connected nodes
255  */
256 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
257 {
258         uint32_t *nodes;
259         TALLOC_CTX *tmp_ctx;
260
261         tmp_ctx = talloc_new(ctdb);
262         CTDB_NO_MEMORY(ctdb, tmp_ctx);
263
264         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
265         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
266                                         nodes, 0,
267                                         CONTROL_TIMEOUT(),
268                                         false, tdb_null,
269                                         async_getcap_callback, NULL,
270                                         NULL) != 0) {
271                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
272                 talloc_free(tmp_ctx);
273                 return -1;
274         }
275
276         talloc_free(tmp_ctx);
277         return 0;
278 }
279
280 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
281 {
282         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
283
284         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
285         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
286 }
287
288 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
289 {
290         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
291
292         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
293         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
294 }
295
296 /*
297   change recovery mode on all nodes
298  */
299 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
300 {
301         TDB_DATA data;
302         uint32_t *nodes;
303         TALLOC_CTX *tmp_ctx;
304
305         tmp_ctx = talloc_new(ctdb);
306         CTDB_NO_MEMORY(ctdb, tmp_ctx);
307
308         /* freeze all nodes */
309         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
310         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
311                 int i;
312
313                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
314                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
315                                                 nodes, i,
316                                                 CONTROL_TIMEOUT(),
317                                                 false, tdb_null,
318                                                 NULL,
319                                                 set_recmode_fail_callback,
320                                                 rec) != 0) {
321                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
322                                 talloc_free(tmp_ctx);
323                                 return -1;
324                         }
325                 }
326         }
327
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&rec_mode;
331
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(),
335                                         false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /*
348   change recovery master on all node
349  */
350 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
351 {
352         TDB_DATA data;
353         TALLOC_CTX *tmp_ctx;
354         uint32_t *nodes;
355
356         tmp_ctx = talloc_new(ctdb);
357         CTDB_NO_MEMORY(ctdb, tmp_ctx);
358
359         data.dsize = sizeof(uint32_t);
360         data.dptr = (unsigned char *)&pnn;
361
362         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
363         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
364                                         nodes, 0,
365                                         CONTROL_TIMEOUT(), false, data,
366                                         NULL, NULL,
367                                         NULL) != 0) {
368                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
369                 talloc_free(tmp_ctx);
370                 return -1;
371         }
372
373         talloc_free(tmp_ctx);
374         return 0;
375 }
376
377 /* update all remote nodes to use the same db priority that we have
378    this can fail if the remove node has not yet been upgraded to 
379    support this function, so we always return success and never fail
380    a recovery if this call fails.
381 */
382 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
383         struct ctdb_node_map *nodemap, 
384         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
385 {
386         int db;
387         uint32_t *nodes;
388
389         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
390
391         /* step through all local databases */
392         for (db=0; db<dbmap->num;db++) {
393                 TDB_DATA data;
394                 struct ctdb_db_priority db_prio;
395                 int ret;
396
397                 db_prio.db_id     = dbmap->dbs[db].dbid;
398                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
399                 if (ret != 0) {
400                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
401                         continue;
402                 }
403
404                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
405
406                 data.dptr  = (uint8_t *)&db_prio;
407                 data.dsize = sizeof(db_prio);
408
409                 if (ctdb_client_async_control(ctdb,
410                                         CTDB_CONTROL_SET_DB_PRIORITY,
411                                         nodes, 0,
412                                         CONTROL_TIMEOUT(), false, data,
413                                         NULL, NULL,
414                                         NULL) != 0) {
415                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
416                 }
417         }
418
419         return 0;
420 }                       
421
422 /*
423   ensure all other nodes have attached to any databases that we have
424  */
425 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
426                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
427 {
428         int i, j, db, ret;
429         struct ctdb_dbid_map *remote_dbmap;
430
431         /* verify that all other nodes have all our databases */
432         for (j=0; j<nodemap->num; j++) {
433                 /* we dont need to ourself ourselves */
434                 if (nodemap->nodes[j].pnn == pnn) {
435                         continue;
436                 }
437                 /* dont check nodes that are unavailable */
438                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
439                         continue;
440                 }
441
442                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
443                                          mem_ctx, &remote_dbmap);
444                 if (ret != 0) {
445                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
446                         return -1;
447                 }
448
449                 /* step through all local databases */
450                 for (db=0; db<dbmap->num;db++) {
451                         const char *name;
452
453
454                         for (i=0;i<remote_dbmap->num;i++) {
455                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
456                                         break;
457                                 }
458                         }
459                         /* the remote node already have this database */
460                         if (i!=remote_dbmap->num) {
461                                 continue;
462                         }
463                         /* ok so we need to create this database */
464                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
465                                             mem_ctx, &name);
466                         if (ret != 0) {
467                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
468                                 return -1;
469                         }
470                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
471                                            mem_ctx, name,
472                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
473                         if (ret != 0) {
474                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
475                                 return -1;
476                         }
477                 }
478         }
479
480         return 0;
481 }
482
483
484 /*
485   ensure we are attached to any databases that anyone else is attached to
486  */
487 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
488                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
489 {
490         int i, j, db, ret;
491         struct ctdb_dbid_map *remote_dbmap;
492
493         /* verify that we have all database any other node has */
494         for (j=0; j<nodemap->num; j++) {
495                 /* we dont need to ourself ourselves */
496                 if (nodemap->nodes[j].pnn == pnn) {
497                         continue;
498                 }
499                 /* dont check nodes that are unavailable */
500                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
501                         continue;
502                 }
503
504                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
505                                          mem_ctx, &remote_dbmap);
506                 if (ret != 0) {
507                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
508                         return -1;
509                 }
510
511                 /* step through all databases on the remote node */
512                 for (db=0; db<remote_dbmap->num;db++) {
513                         const char *name;
514
515                         for (i=0;i<(*dbmap)->num;i++) {
516                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
517                                         break;
518                                 }
519                         }
520                         /* we already have this db locally */
521                         if (i!=(*dbmap)->num) {
522                                 continue;
523                         }
524                         /* ok so we need to create this database and
525                            rebuild dbmap
526                          */
527                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
528                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
529                         if (ret != 0) {
530                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
531                                           nodemap->nodes[j].pnn));
532                                 return -1;
533                         }
534                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
535                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
536                         if (ret != 0) {
537                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
538                                 return -1;
539                         }
540                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
541                         if (ret != 0) {
542                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
543                                 return -1;
544                         }
545                 }
546         }
547
548         return 0;
549 }
550
551
552 /*
553   pull the remote database contents from one node into the recdb
554  */
555 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
556                                     struct tdb_wrap *recdb, uint32_t dbid)
557 {
558         int ret;
559         TDB_DATA outdata;
560         struct ctdb_marshall_buffer *reply;
561         struct ctdb_rec_data *rec;
562         int i;
563         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
564
565         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
566                                CONTROL_TIMEOUT(), &outdata);
567         if (ret != 0) {
568                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
569                 talloc_free(tmp_ctx);
570                 return -1;
571         }
572
573         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
574
575         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
576                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
577                 talloc_free(tmp_ctx);
578                 return -1;
579         }
580         
581         rec = (struct ctdb_rec_data *)&reply->data[0];
582         
583         for (i=0;
584              i<reply->count;
585              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
586                 TDB_DATA key, data;
587                 struct ctdb_ltdb_header *hdr;
588                 TDB_DATA existing;
589                 
590                 key.dptr = &rec->data[0];
591                 key.dsize = rec->keylen;
592                 data.dptr = &rec->data[key.dsize];
593                 data.dsize = rec->datalen;
594                 
595                 hdr = (struct ctdb_ltdb_header *)data.dptr;
596
597                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
598                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
599                         talloc_free(tmp_ctx);
600                         return -1;
601                 }
602
603                 /* fetch the existing record, if any */
604                 existing = tdb_fetch(recdb->tdb, key);
605                 
606                 if (existing.dptr != NULL) {
607                         struct ctdb_ltdb_header header;
608                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
609                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
610                                          (unsigned)existing.dsize, srcnode));
611                                 free(existing.dptr);
612                                 talloc_free(tmp_ctx);
613                                 return -1;
614                         }
615                         header = *(struct ctdb_ltdb_header *)existing.dptr;
616                         free(existing.dptr);
617                         if (!(header.rsn < hdr->rsn ||
618                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
619                                 continue;
620                         }
621                 }
622                 
623                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
624                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
625                         talloc_free(tmp_ctx);
626                         return -1;                              
627                 }
628         }
629
630         talloc_free(tmp_ctx);
631
632         return 0;
633 }
634
635
636 struct pull_seqnum_cbdata {
637         int failed;
638         uint32_t pnn;
639         uint64_t seqnum;
640 };
641
642 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
643 {
644         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
645         uint64_t seqnum;
646
647         if (cb_data->failed != 0) {
648                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
649                 return;
650         }
651
652         if (res != 0) {
653                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
654                 cb_data->failed = 1;
655                 return;
656         }
657
658         if (outdata.dsize != sizeof(uint64_t)) {
659                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
660                 cb_data->failed = -1;
661                 return;
662         }
663
664         seqnum = *((uint64_t *)outdata.dptr);
665
666         if (seqnum > cb_data->seqnum) {
667                 cb_data->seqnum = seqnum;
668                 cb_data->pnn = node_pnn;
669         }
670 }
671
672 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
673 {
674         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
675
676         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
677         cb_data->failed = 1;
678 }
679
680 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
681                                 struct ctdb_recoverd *rec, 
682                                 struct ctdb_node_map *nodemap, 
683                                 struct tdb_wrap *recdb, uint32_t dbid)
684 {
685         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
686         uint32_t *nodes;
687         TDB_DATA data;
688         uint32_t outdata[2];
689         struct pull_seqnum_cbdata *cb_data;
690
691         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
692
693         outdata[0] = dbid;
694         outdata[1] = 0;
695
696         data.dsize = sizeof(outdata);
697         data.dptr  = (uint8_t *)&outdata[0];
698
699         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
700         if (cb_data == NULL) {
701                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
702                 talloc_free(tmp_ctx);
703                 return -1;
704         }
705
706         cb_data->failed = 0;
707         cb_data->pnn    = -1;
708         cb_data->seqnum = 0;
709         
710         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
711         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
712                                         nodes, 0,
713                                         CONTROL_TIMEOUT(), false, data,
714                                         pull_seqnum_cb,
715                                         pull_seqnum_fail_cb,
716                                         cb_data) != 0) {
717                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
718
719                 talloc_free(tmp_ctx);
720                 return -1;
721         }
722
723         if (cb_data->failed != 0) {
724                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
725                 talloc_free(tmp_ctx);
726                 return -1;
727         }
728
729         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
730                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
731                 talloc_free(tmp_ctx);
732                 return -1;
733         }
734
735         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
736
737         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
738                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
739                 talloc_free(tmp_ctx);
740                 return -1;
741         }
742
743         talloc_free(tmp_ctx);
744         return 0;
745 }
746
747
748 /*
749   pull all the remote database contents into the recdb
750  */
751 static int pull_remote_database(struct ctdb_context *ctdb,
752                                 struct ctdb_recoverd *rec, 
753                                 struct ctdb_node_map *nodemap, 
754                                 struct tdb_wrap *recdb, uint32_t dbid,
755                                 bool persistent)
756 {
757         int j;
758
759         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
760                 int ret;
761                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
762                 if (ret == 0) {
763                         return 0;
764                 }
765         }
766
767         /* pull all records from all other nodes across onto this node
768            (this merges based on rsn)
769         */
770         for (j=0; j<nodemap->num; j++) {
771                 /* dont merge from nodes that are unavailable */
772                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
773                         continue;
774                 }
775                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
776                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
777                                  nodemap->nodes[j].pnn));
778                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
779                         return -1;
780                 }
781         }
782         
783         return 0;
784 }
785
786
787 /*
788   update flags on all active nodes
789  */
790 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
791 {
792         int ret;
793
794         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
795                 if (ret != 0) {
796                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
797                 return -1;
798         }
799
800         return 0;
801 }
802
803 /*
804   ensure all nodes have the same vnnmap we do
805  */
806 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
807                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
808 {
809         int j, ret;
810
811         /* push the new vnn map out to all the nodes */
812         for (j=0; j<nodemap->num; j++) {
813                 /* dont push to nodes that are unavailable */
814                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
815                         continue;
816                 }
817
818                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
819                 if (ret != 0) {
820                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
821                         return -1;
822                 }
823         }
824
825         return 0;
826 }
827
828
829 struct vacuum_info {
830         struct vacuum_info *next, *prev;
831         struct ctdb_recoverd *rec;
832         uint32_t srcnode;
833         struct ctdb_db_context *ctdb_db;
834         struct ctdb_marshall_buffer *recs;
835         struct ctdb_rec_data *r;
836 };
837
838 static void vacuum_fetch_next(struct vacuum_info *v);
839
840 /*
841   called when a vacuum fetch has completed - just free it and do the next one
842  */
843 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
844 {
845         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
846         talloc_free(state);
847         vacuum_fetch_next(v);
848 }
849
850
851 /*
852   process the next element from the vacuum list
853 */
854 static void vacuum_fetch_next(struct vacuum_info *v)
855 {
856         struct ctdb_call call;
857         struct ctdb_rec_data *r;
858
859         while (v->recs->count) {
860                 struct ctdb_client_call_state *state;
861                 TDB_DATA data;
862                 struct ctdb_ltdb_header *hdr;
863
864                 ZERO_STRUCT(call);
865                 call.call_id = CTDB_NULL_FUNC;
866                 call.flags = CTDB_IMMEDIATE_MIGRATION;
867                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
868
869                 r = v->r;
870                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
871                 v->recs->count--;
872
873                 call.key.dptr = &r->data[0];
874                 call.key.dsize = r->keylen;
875
876                 /* ensure we don't block this daemon - just skip a record if we can't get
877                    the chainlock */
878                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
879                         continue;
880                 }
881
882                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
883                 if (data.dptr == NULL) {
884                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
885                         continue;
886                 }
887
888                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
889                         free(data.dptr);
890                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
891                         continue;
892                 }
893                 
894                 hdr = (struct ctdb_ltdb_header *)data.dptr;
895                 if (hdr->dmaster == v->rec->ctdb->pnn) {
896                         /* its already local */
897                         free(data.dptr);
898                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
899                         continue;
900                 }
901
902                 free(data.dptr);
903
904                 state = ctdb_call_send(v->ctdb_db, &call);
905                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
906                 if (state == NULL) {
907                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
908                         talloc_free(v);
909                         return;
910                 }
911                 state->async.fn = vacuum_fetch_callback;
912                 state->async.private_data = v;
913                 return;
914         }
915
916         talloc_free(v);
917 }
918
919
920 /*
921   destroy a vacuum info structure
922  */
923 static int vacuum_info_destructor(struct vacuum_info *v)
924 {
925         DLIST_REMOVE(v->rec->vacuum_info, v);
926         return 0;
927 }
928
929
930 /*
931   handler for vacuum fetch
932 */
933 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
934                                  TDB_DATA data, void *private_data)
935 {
936         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
937         struct ctdb_marshall_buffer *recs;
938         int ret, i;
939         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
940         const char *name;
941         struct ctdb_dbid_map *dbmap=NULL;
942         bool persistent = false;
943         struct ctdb_db_context *ctdb_db;
944         struct ctdb_rec_data *r;
945         uint32_t srcnode;
946         struct vacuum_info *v;
947
948         recs = (struct ctdb_marshall_buffer *)data.dptr;
949         r = (struct ctdb_rec_data *)&recs->data[0];
950
951         if (recs->count == 0) {
952                 talloc_free(tmp_ctx);
953                 return;
954         }
955
956         srcnode = r->reqid;
957
958         for (v=rec->vacuum_info;v;v=v->next) {
959                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
960                         /* we're already working on records from this node */
961                         talloc_free(tmp_ctx);
962                         return;
963                 }
964         }
965
966         /* work out if the database is persistent */
967         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
968         if (ret != 0) {
969                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
970                 talloc_free(tmp_ctx);
971                 return;
972         }
973
974         for (i=0;i<dbmap->num;i++) {
975                 if (dbmap->dbs[i].dbid == recs->db_id) {
976                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
977                         break;
978                 }
979         }
980         if (i == dbmap->num) {
981                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
982                 talloc_free(tmp_ctx);
983                 return;         
984         }
985
986         /* find the name of this database */
987         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
988                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
989                 talloc_free(tmp_ctx);
990                 return;
991         }
992
993         /* attach to it */
994         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
995         if (ctdb_db == NULL) {
996                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
997                 talloc_free(tmp_ctx);
998                 return;
999         }
1000
1001         v = talloc_zero(rec, struct vacuum_info);
1002         if (v == NULL) {
1003                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1004                 talloc_free(tmp_ctx);
1005                 return;
1006         }
1007
1008         v->rec = rec;
1009         v->srcnode = srcnode;
1010         v->ctdb_db = ctdb_db;
1011         v->recs = talloc_memdup(v, recs, data.dsize);
1012         if (v->recs == NULL) {
1013                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1014                 talloc_free(v);
1015                 talloc_free(tmp_ctx);
1016                 return;         
1017         }
1018         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1019
1020         DLIST_ADD(rec->vacuum_info, v);
1021
1022         talloc_set_destructor(v, vacuum_info_destructor);
1023
1024         vacuum_fetch_next(v);
1025         talloc_free(tmp_ctx);
1026 }
1027
1028
1029 /*
1030   called when ctdb_wait_timeout should finish
1031  */
1032 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1033                               struct timeval yt, void *p)
1034 {
1035         uint32_t *timed_out = (uint32_t *)p;
1036         (*timed_out) = 1;
1037 }
1038
1039 /*
1040   wait for a given number of seconds
1041  */
1042 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1043 {
1044         uint32_t timed_out = 0;
1045         time_t usecs = (secs - (time_t)secs) * 1000000;
1046         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1047         while (!timed_out) {
1048                 event_loop_once(ctdb->ev);
1049         }
1050 }
1051
1052 /*
1053   called when an election times out (ends)
1054  */
1055 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1056                                   struct timeval t, void *p)
1057 {
1058         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1059         rec->election_timeout = NULL;
1060         fast_start = false;
1061
1062         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1063 }
1064
1065
1066 /*
1067   wait for an election to finish. It finished election_timeout seconds after
1068   the last election packet is received
1069  */
1070 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1071 {
1072         struct ctdb_context *ctdb = rec->ctdb;
1073         while (rec->election_timeout) {
1074                 event_loop_once(ctdb->ev);
1075         }
1076 }
1077
1078 /*
1079   Update our local flags from all remote connected nodes. 
1080   This is only run when we are or we belive we are the recovery master
1081  */
1082 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1083 {
1084         int j;
1085         struct ctdb_context *ctdb = rec->ctdb;
1086         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1087
1088         /* get the nodemap for all active remote nodes and verify
1089            they are the same as for this node
1090          */
1091         for (j=0; j<nodemap->num; j++) {
1092                 struct ctdb_node_map *remote_nodemap=NULL;
1093                 int ret;
1094
1095                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1096                         continue;
1097                 }
1098                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1099                         continue;
1100                 }
1101
1102                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1103                                            mem_ctx, &remote_nodemap);
1104                 if (ret != 0) {
1105                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1106                                   nodemap->nodes[j].pnn));
1107                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1108                         talloc_free(mem_ctx);
1109                         return MONITOR_FAILED;
1110                 }
1111                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1112                         /* We should tell our daemon about this so it
1113                            updates its flags or else we will log the same 
1114                            message again in the next iteration of recovery.
1115                            Since we are the recovery master we can just as
1116                            well update the flags on all nodes.
1117                         */
1118                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1119                         if (ret != 0) {
1120                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1121                                 return -1;
1122                         }
1123
1124                         /* Update our local copy of the flags in the recovery
1125                            daemon.
1126                         */
1127                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1128                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1129                                  nodemap->nodes[j].flags));
1130                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1131                 }
1132                 talloc_free(remote_nodemap);
1133         }
1134         talloc_free(mem_ctx);
1135         return MONITOR_OK;
1136 }
1137
1138
1139 /* Create a new random generation ip. 
1140    The generation id can not be the INVALID_GENERATION id
1141 */
1142 static uint32_t new_generation(void)
1143 {
1144         uint32_t generation;
1145
1146         while (1) {
1147                 generation = random();
1148
1149                 if (generation != INVALID_GENERATION) {
1150                         break;
1151                 }
1152         }
1153
1154         return generation;
1155 }
1156
1157
1158 /*
1159   create a temporary working database
1160  */
1161 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1162 {
1163         char *name;
1164         struct tdb_wrap *recdb;
1165         unsigned tdb_flags;
1166
1167         /* open up the temporary recovery database */
1168         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1169                                ctdb->db_directory_state,
1170                                ctdb->pnn);
1171         if (name == NULL) {
1172                 return NULL;
1173         }
1174         unlink(name);
1175
1176         tdb_flags = TDB_NOLOCK;
1177         if (ctdb->valgrinding) {
1178                 tdb_flags |= TDB_NOMMAP;
1179         }
1180         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1181
1182         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1183                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1184         if (recdb == NULL) {
1185                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1186         }
1187
1188         talloc_free(name);
1189
1190         return recdb;
1191 }
1192
1193
1194 /* 
1195    a traverse function for pulling all relevant records from recdb
1196  */
1197 struct recdb_data {
1198         struct ctdb_context *ctdb;
1199         struct ctdb_marshall_buffer *recdata;
1200         uint32_t len;
1201         uint32_t allocated_len;
1202         bool failed;
1203         bool persistent;
1204 };
1205
1206 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1207 {
1208         struct recdb_data *params = (struct recdb_data *)p;
1209         struct ctdb_rec_data *rec;
1210         struct ctdb_ltdb_header *hdr;
1211
1212         /*
1213          * skip empty records - but NOT for persistent databases:
1214          *
1215          * The record-by-record mode of recovery deletes empty records.
1216          * For persistent databases, this can lead to data corruption
1217          * by deleting records that should be there:
1218          *
1219          * - Assume the cluster has been running for a while.
1220          *
1221          * - A record R in a persistent database has been created and
1222          *   deleted a couple of times, the last operation being deletion,
1223          *   leaving an empty record with a high RSN, say 10.
1224          *
1225          * - Now a node N is turned off.
1226          *
1227          * - This leaves the local database copy of D on N with the empty
1228          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1229          *   the copy of record R.
1230          *
1231          * - Now the record is created again while node N is turned off.
1232          *   This creates R with RSN = 1 on all nodes except for N.
1233          *
1234          * - Now node N is turned on again. The following recovery will chose
1235          *   the older empty copy of R due to RSN 10 > RSN 1.
1236          *
1237          * ==> Hence the record is gone after the recovery.
1238          *
1239          * On databases like Samba's registry, this can damage the higher-level
1240          * data structures built from the various tdb-level records.
1241          */
1242         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1243                 return 0;
1244         }
1245
1246         /* update the dmaster field to point to us */
1247         hdr = (struct ctdb_ltdb_header *)data.dptr;
1248         if (!params->persistent) {
1249                 hdr->dmaster = params->ctdb->pnn;
1250                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1251         }
1252
1253         /* add the record to the blob ready to send to the nodes */
1254         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1255         if (rec == NULL) {
1256                 params->failed = true;
1257                 return -1;
1258         }
1259         if (params->len + rec->length >= params->allocated_len) {
1260                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1261                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1262         }
1263         if (params->recdata == NULL) {
1264                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1265                          rec->length + params->len));
1266                 params->failed = true;
1267                 return -1;
1268         }
1269         params->recdata->count++;
1270         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1271         params->len += rec->length;
1272         talloc_free(rec);
1273
1274         return 0;
1275 }
1276
1277 /*
1278   push the recdb database out to all nodes
1279  */
1280 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1281                                bool persistent,
1282                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1283 {
1284         struct recdb_data params;
1285         struct ctdb_marshall_buffer *recdata;
1286         TDB_DATA outdata;
1287         TALLOC_CTX *tmp_ctx;
1288         uint32_t *nodes;
1289
1290         tmp_ctx = talloc_new(ctdb);
1291         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1292
1293         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1294         CTDB_NO_MEMORY(ctdb, recdata);
1295
1296         recdata->db_id = dbid;
1297
1298         params.ctdb = ctdb;
1299         params.recdata = recdata;
1300         params.len = offsetof(struct ctdb_marshall_buffer, data);
1301         params.allocated_len = params.len;
1302         params.failed = false;
1303         params.persistent = persistent;
1304
1305         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1306                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1307                 talloc_free(params.recdata);
1308                 talloc_free(tmp_ctx);
1309                 return -1;
1310         }
1311
1312         if (params.failed) {
1313                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1314                 talloc_free(params.recdata);
1315                 talloc_free(tmp_ctx);
1316                 return -1;              
1317         }
1318
1319         recdata = params.recdata;
1320
1321         outdata.dptr = (void *)recdata;
1322         outdata.dsize = params.len;
1323
1324         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1325         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1326                                         nodes, 0,
1327                                         CONTROL_TIMEOUT(), false, outdata,
1328                                         NULL, NULL,
1329                                         NULL) != 0) {
1330                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1331                 talloc_free(recdata);
1332                 talloc_free(tmp_ctx);
1333                 return -1;
1334         }
1335
1336         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1337                   dbid, recdata->count));
1338
1339         talloc_free(recdata);
1340         talloc_free(tmp_ctx);
1341
1342         return 0;
1343 }
1344
1345
1346 /*
1347   go through a full recovery on one database 
1348  */
1349 static int recover_database(struct ctdb_recoverd *rec, 
1350                             TALLOC_CTX *mem_ctx,
1351                             uint32_t dbid,
1352                             bool persistent,
1353                             uint32_t pnn, 
1354                             struct ctdb_node_map *nodemap,
1355                             uint32_t transaction_id)
1356 {
1357         struct tdb_wrap *recdb;
1358         int ret;
1359         struct ctdb_context *ctdb = rec->ctdb;
1360         TDB_DATA data;
1361         struct ctdb_control_wipe_database w;
1362         uint32_t *nodes;
1363
1364         recdb = create_recdb(ctdb, mem_ctx);
1365         if (recdb == NULL) {
1366                 return -1;
1367         }
1368
1369         /* pull all remote databases onto the recdb */
1370         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1371         if (ret != 0) {
1372                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1373                 return -1;
1374         }
1375
1376         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1377
1378         /* wipe all the remote databases. This is safe as we are in a transaction */
1379         w.db_id = dbid;
1380         w.transaction_id = transaction_id;
1381
1382         data.dptr = (void *)&w;
1383         data.dsize = sizeof(w);
1384
1385         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1386         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1387                                         nodes, 0,
1388                                         CONTROL_TIMEOUT(), false, data,
1389                                         NULL, NULL,
1390                                         NULL) != 0) {
1391                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1392                 talloc_free(recdb);
1393                 return -1;
1394         }
1395         
1396         /* push out the correct database. This sets the dmaster and skips 
1397            the empty records */
1398         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1399         if (ret != 0) {
1400                 talloc_free(recdb);
1401                 return -1;
1402         }
1403
1404         /* all done with this database */
1405         talloc_free(recdb);
1406
1407         return 0;
1408 }
1409
1410 /*
1411   reload the nodes file 
1412 */
1413 static void reload_nodes_file(struct ctdb_context *ctdb)
1414 {
1415         ctdb->nodes = NULL;
1416         ctdb_load_nodes_file(ctdb);
1417 }
1418
1419 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1420                                          struct ctdb_recoverd *rec,
1421                                          struct ctdb_node_map *nodemap,
1422                                          uint32_t *culprit)
1423 {
1424         int j;
1425         int ret;
1426
1427         if (ctdb->num_nodes != nodemap->num) {
1428                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1429                                   ctdb->num_nodes, nodemap->num));
1430                 if (culprit) {
1431                         *culprit = ctdb->pnn;
1432                 }
1433                 return -1;
1434         }
1435
1436         for (j=0; j<nodemap->num; j++) {
1437                 /* For readability */
1438                 struct ctdb_node *node = ctdb->nodes[j];
1439
1440                 /* release any existing data */
1441                 if (node->known_public_ips) {
1442                         talloc_free(node->known_public_ips);
1443                         node->known_public_ips = NULL;
1444                 }
1445                 if (node->available_public_ips) {
1446                         talloc_free(node->available_public_ips);
1447                         node->available_public_ips = NULL;
1448                 }
1449
1450                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1451                         continue;
1452                 }
1453
1454                 /* Retrieve the list of known public IPs from the node */
1455                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1456                                         CONTROL_TIMEOUT(),
1457                                         node->pnn,
1458                                         ctdb->nodes,
1459                                         0,
1460                                         &node->known_public_ips);
1461                 if (ret != 0) {
1462                         DEBUG(DEBUG_ERR,
1463                               ("Failed to read known public IPs from node: %u\n",
1464                                node->pnn));
1465                         if (culprit) {
1466                                 *culprit = node->pnn;
1467                         }
1468                         return -1;
1469                 }
1470
1471                 if (ctdb->do_checkpublicip &&
1472                     (rec->ip_check_disable_ctx == NULL) &&
1473                     verify_remote_ip_allocation(ctdb,
1474                                                  node->known_public_ips,
1475                                                  node->pnn)) {
1476                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1477                         rec->need_takeover_run = true;
1478                 }
1479
1480                 /* Retrieve the list of available public IPs from the node */
1481                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1482                                         CONTROL_TIMEOUT(),
1483                                         node->pnn,
1484                                         ctdb->nodes,
1485                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1486                                         &node->available_public_ips);
1487                 if (ret != 0) {
1488                         DEBUG(DEBUG_ERR,
1489                               ("Failed to read available public IPs from node: %u\n",
1490                                node->pnn));
1491                         if (culprit) {
1492                                 *culprit = node->pnn;
1493                         }
1494                         return -1;
1495                 }
1496         }
1497
1498         return 0;
1499 }
1500
1501 /* when we start a recovery, make sure all nodes use the same reclock file
1502    setting
1503 */
1504 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1505 {
1506         struct ctdb_context *ctdb = rec->ctdb;
1507         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1508         TDB_DATA data;
1509         uint32_t *nodes;
1510
1511         if (ctdb->recovery_lock_file == NULL) {
1512                 data.dptr  = NULL;
1513                 data.dsize = 0;
1514         } else {
1515                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1516                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1517         }
1518
1519         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1520         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1521                                         nodes, 0,
1522                                         CONTROL_TIMEOUT(),
1523                                         false, data,
1524                                         NULL, NULL,
1525                                         rec) != 0) {
1526                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1527                 talloc_free(tmp_ctx);
1528                 return -1;
1529         }
1530
1531         talloc_free(tmp_ctx);
1532         return 0;
1533 }
1534
1535
1536 /*
1537  * this callback is called for every node that failed to execute ctdb_takeover_run()
1538  * and set flag to re-run takeover run.
1539  */
1540 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1541 {
1542         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1543
1544         if (callback_data != NULL) {
1545                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1546
1547                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1548
1549                 ctdb_set_culprit(rec, node_pnn);
1550         }
1551 }
1552
1553
1554 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1555 {
1556         struct ctdb_context *ctdb = rec->ctdb;
1557         int i;
1558         struct ctdb_banning_state *ban_state;
1559
1560         *self_ban = false;
1561         for (i=0; i<ctdb->num_nodes; i++) {
1562                 if (ctdb->nodes[i]->ban_state == NULL) {
1563                         continue;
1564                 }
1565                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1566                 if (ban_state->count < 2*ctdb->num_nodes) {
1567                         continue;
1568                 }
1569
1570                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1571                         ctdb->nodes[i]->pnn, ban_state->count,
1572                         ctdb->tunable.recovery_ban_period));
1573                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1574                 ban_state->count = 0;
1575
1576                 /* Banning ourself? */
1577                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1578                         *self_ban = true;
1579                 }
1580         }
1581 }
1582
1583 static bool do_takeover_run(struct ctdb_recoverd *rec,
1584                             struct ctdb_node_map *nodemap,
1585                             bool banning_credits_on_fail)
1586 {
1587         int ret;
1588         bool ok;
1589
1590         if (rec->takeover_run_in_progress) {
1591                 DEBUG(DEBUG_ERR, (__location__
1592                                   " takeover run already in progress \n"));
1593                 ok = false;
1594                 goto done;
1595         }
1596
1597         rec->takeover_run_in_progress = true;
1598
1599         ret = ctdb_takeover_run(rec->ctdb, nodemap, takeover_fail_callback,
1600                                 banning_credits_on_fail ? rec : NULL);
1601         if (ret != 0) {
1602                 DEBUG(DEBUG_ERR, ("IP reallocation failed\n"));
1603                 ok = false;
1604                 goto done;
1605         }
1606
1607         ok = true;
1608 done:
1609         rec->need_takeover_run = !ok;
1610         rec->takeover_run_in_progress = false;
1611         return ok;
1612 }
1613
1614
1615 /*
1616   we are the recmaster, and recovery is needed - start a recovery run
1617  */
1618 static int do_recovery(struct ctdb_recoverd *rec, 
1619                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1620                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1621 {
1622         struct ctdb_context *ctdb = rec->ctdb;
1623         int i, j, ret;
1624         uint32_t generation;
1625         struct ctdb_dbid_map *dbmap;
1626         TDB_DATA data;
1627         uint32_t *nodes;
1628         struct timeval start_time;
1629         uint32_t culprit = (uint32_t)-1;
1630         bool self_ban;
1631
1632         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1633
1634         /* if recovery fails, force it again */
1635         rec->need_recovery = true;
1636
1637         ban_misbehaving_nodes(rec, &self_ban);
1638         if (self_ban) {
1639                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1640                 return -1;
1641         }
1642
1643         if (ctdb->tunable.verify_recovery_lock != 0) {
1644                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1645                 start_time = timeval_current();
1646                 if (!ctdb_recovery_lock(ctdb, true)) {
1647                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1648                                          "and ban ourself for %u seconds\n",
1649                                          ctdb->tunable.recovery_ban_period));
1650                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1651                         return -1;
1652                 }
1653                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1654                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1655         }
1656
1657         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1658
1659         /* get a list of all databases */
1660         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1661         if (ret != 0) {
1662                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1663                 return -1;
1664         }
1665
1666         /* we do the db creation before we set the recovery mode, so the freeze happens
1667            on all databases we will be dealing with. */
1668
1669         /* verify that we have all the databases any other node has */
1670         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1671         if (ret != 0) {
1672                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1673                 return -1;
1674         }
1675
1676         /* verify that all other nodes have all our databases */
1677         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1678         if (ret != 0) {
1679                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1680                 return -1;
1681         }
1682         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1683
1684         /* update the database priority for all remote databases */
1685         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1686         if (ret != 0) {
1687                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1688         }
1689         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1690
1691
1692         /* update all other nodes to use the same setting for reclock files
1693            as the local recovery master.
1694         */
1695         sync_recovery_lock_file_across_cluster(rec);
1696
1697         /* set recovery mode to active on all nodes */
1698         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1699         if (ret != 0) {
1700                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1701                 return -1;
1702         }
1703
1704         /* execute the "startrecovery" event script on all nodes */
1705         ret = run_startrecovery_eventscript(rec, nodemap);
1706         if (ret!=0) {
1707                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1708                 return -1;
1709         }
1710
1711         /*
1712           update all nodes to have the same flags that we have
1713          */
1714         for (i=0;i<nodemap->num;i++) {
1715                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1716                         continue;
1717                 }
1718
1719                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1720                 if (ret != 0) {
1721                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1722                         return -1;
1723                 }
1724         }
1725
1726         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1727
1728         /* pick a new generation number */
1729         generation = new_generation();
1730
1731         /* change the vnnmap on this node to use the new generation 
1732            number but not on any other nodes.
1733            this guarantees that if we abort the recovery prematurely
1734            for some reason (a node stops responding?)
1735            that we can just return immediately and we will reenter
1736            recovery shortly again.
1737            I.e. we deliberately leave the cluster with an inconsistent
1738            generation id to allow us to abort recovery at any stage and
1739            just restart it from scratch.
1740          */
1741         vnnmap->generation = generation;
1742         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1743         if (ret != 0) {
1744                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1745                 return -1;
1746         }
1747
1748         data.dptr = (void *)&generation;
1749         data.dsize = sizeof(uint32_t);
1750
1751         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1752         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1753                                         nodes, 0,
1754                                         CONTROL_TIMEOUT(), false, data,
1755                                         NULL,
1756                                         transaction_start_fail_callback,
1757                                         rec) != 0) {
1758                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1759                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1760                                         nodes, 0,
1761                                         CONTROL_TIMEOUT(), false, tdb_null,
1762                                         NULL,
1763                                         NULL,
1764                                         NULL) != 0) {
1765                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1766                 }
1767                 return -1;
1768         }
1769
1770         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1771
1772         for (i=0;i<dbmap->num;i++) {
1773                 ret = recover_database(rec, mem_ctx,
1774                                        dbmap->dbs[i].dbid,
1775                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1776                                        pnn, nodemap, generation);
1777                 if (ret != 0) {
1778                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1779                         return -1;
1780                 }
1781         }
1782
1783         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1784
1785         /* commit all the changes */
1786         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1787                                         nodes, 0,
1788                                         CONTROL_TIMEOUT(), false, data,
1789                                         NULL, NULL,
1790                                         NULL) != 0) {
1791                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1792                 return -1;
1793         }
1794
1795         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1796         
1797
1798         /* update the capabilities for all nodes */
1799         ret = update_capabilities(ctdb, nodemap);
1800         if (ret!=0) {
1801                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1802                 return -1;
1803         }
1804
1805         /* build a new vnn map with all the currently active and
1806            unbanned nodes */
1807         generation = new_generation();
1808         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1809         CTDB_NO_MEMORY(ctdb, vnnmap);
1810         vnnmap->generation = generation;
1811         vnnmap->size = 0;
1812         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1813         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1814         for (i=j=0;i<nodemap->num;i++) {
1815                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1816                         continue;
1817                 }
1818                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1819                         /* this node can not be an lmaster */
1820                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1821                         continue;
1822                 }
1823
1824                 vnnmap->size++;
1825                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1826                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1827                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1828
1829         }
1830         if (vnnmap->size == 0) {
1831                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1832                 vnnmap->size++;
1833                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1834                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1835                 vnnmap->map[0] = pnn;
1836         }       
1837
1838         /* update to the new vnnmap on all nodes */
1839         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1840         if (ret != 0) {
1841                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1842                 return -1;
1843         }
1844
1845         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1846
1847         /* update recmaster to point to us for all nodes */
1848         ret = set_recovery_master(ctdb, nodemap, pnn);
1849         if (ret!=0) {
1850                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1851                 return -1;
1852         }
1853
1854         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1855
1856         /*
1857           update all nodes to have the same flags that we have
1858          */
1859         for (i=0;i<nodemap->num;i++) {
1860                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1861                         continue;
1862                 }
1863
1864                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1865                 if (ret != 0) {
1866                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1867                         return -1;
1868                 }
1869         }
1870
1871         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1872
1873         /* disable recovery mode */
1874         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1875         if (ret != 0) {
1876                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1877                 return -1;
1878         }
1879
1880         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1881
1882         /* Fetch known/available public IPs from each active node */
1883         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1884         if (ret != 0) {
1885                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1886                                  culprit));
1887                 rec->need_takeover_run = true;
1888                 return -1;
1889         }
1890
1891         do_takeover_run(rec, nodemap, false);
1892
1893         /* execute the "recovered" event script on all nodes */
1894         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1895         if (ret!=0) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1897                 return -1;
1898         }
1899
1900         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1901
1902         /* send a message to all clients telling them that the cluster 
1903            has been reconfigured */
1904         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1905
1906         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1907
1908         rec->need_recovery = false;
1909
1910         /* we managed to complete a full recovery, make sure to forgive
1911            any past sins by the nodes that could now participate in the
1912            recovery.
1913         */
1914         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1915         for (i=0;i<nodemap->num;i++) {
1916                 struct ctdb_banning_state *ban_state;
1917
1918                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1919                         continue;
1920                 }
1921
1922                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1923                 if (ban_state == NULL) {
1924                         continue;
1925                 }
1926
1927                 ban_state->count = 0;
1928         }
1929
1930
1931         /* We just finished a recovery successfully. 
1932            We now wait for rerecovery_timeout before we allow 
1933            another recovery to take place.
1934         */
1935         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1936         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1937         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1938
1939         return 0;
1940 }
1941
1942
1943 /*
1944   elections are won by first checking the number of connected nodes, then
1945   the priority time, then the pnn
1946  */
1947 struct election_message {
1948         uint32_t num_connected;
1949         struct timeval priority_time;
1950         uint32_t pnn;
1951         uint32_t node_flags;
1952 };
1953
1954 /*
1955   form this nodes election data
1956  */
1957 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1958 {
1959         int ret, i;
1960         struct ctdb_node_map *nodemap;
1961         struct ctdb_context *ctdb = rec->ctdb;
1962
1963         ZERO_STRUCTP(em);
1964
1965         em->pnn = rec->ctdb->pnn;
1966         em->priority_time = rec->priority_time;
1967
1968         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1969         if (ret != 0) {
1970                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1971                 return;
1972         }
1973
1974         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1975         em->node_flags = rec->node_flags;
1976
1977         for (i=0;i<nodemap->num;i++) {
1978                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1979                         em->num_connected++;
1980                 }
1981         }
1982
1983         /* we shouldnt try to win this election if we cant be a recmaster */
1984         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1985                 em->num_connected = 0;
1986                 em->priority_time = timeval_current();
1987         }
1988
1989         talloc_free(nodemap);
1990 }
1991
1992 /*
1993   see if the given election data wins
1994  */
1995 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1996 {
1997         struct election_message myem;
1998         int cmp = 0;
1999
2000         ctdb_election_data(rec, &myem);
2001
2002         /* we cant win if we dont have the recmaster capability */
2003         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2004                 return false;
2005         }
2006
2007         /* we cant win if we are banned */
2008         if (rec->node_flags & NODE_FLAGS_BANNED) {
2009                 return false;
2010         }
2011
2012         /* we cant win if we are stopped */
2013         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2014                 return false;
2015         }
2016
2017         /* we will automatically win if the other node is banned */
2018         if (em->node_flags & NODE_FLAGS_BANNED) {
2019                 return true;
2020         }
2021
2022         /* we will automatically win if the other node is banned */
2023         if (em->node_flags & NODE_FLAGS_STOPPED) {
2024                 return true;
2025         }
2026
2027         /* try to use the most connected node */
2028         if (cmp == 0) {
2029                 cmp = (int)myem.num_connected - (int)em->num_connected;
2030         }
2031
2032         /* then the longest running node */
2033         if (cmp == 0) {
2034                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2035         }
2036
2037         if (cmp == 0) {
2038                 cmp = (int)myem.pnn - (int)em->pnn;
2039         }
2040
2041         return cmp > 0;
2042 }
2043
2044 /*
2045   send out an election request
2046  */
2047 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2048 {
2049         int ret;
2050         TDB_DATA election_data;
2051         struct election_message emsg;
2052         uint64_t srvid;
2053         struct ctdb_context *ctdb = rec->ctdb;
2054
2055         srvid = CTDB_SRVID_RECOVERY;
2056
2057         ctdb_election_data(rec, &emsg);
2058
2059         election_data.dsize = sizeof(struct election_message);
2060         election_data.dptr  = (unsigned char *)&emsg;
2061
2062
2063         /* send an election message to all active nodes */
2064         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2065         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2066
2067
2068         /* A new node that is already frozen has entered the cluster.
2069            The existing nodes are not frozen and dont need to be frozen
2070            until the election has ended and we start the actual recovery
2071         */
2072         if (update_recmaster == true) {
2073                 /* first we assume we will win the election and set 
2074                    recoverymaster to be ourself on the current node
2075                  */
2076                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2077                 if (ret != 0) {
2078                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2079                         return -1;
2080                 }
2081         }
2082
2083
2084         return 0;
2085 }
2086
2087 /*
2088   this function will unban all nodes in the cluster
2089 */
2090 static void unban_all_nodes(struct ctdb_context *ctdb)
2091 {
2092         int ret, i;
2093         struct ctdb_node_map *nodemap;
2094         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2095         
2096         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2097         if (ret != 0) {
2098                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2099                 return;
2100         }
2101
2102         for (i=0;i<nodemap->num;i++) {
2103                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2104                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2105                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2106                 }
2107         }
2108
2109         talloc_free(tmp_ctx);
2110 }
2111
2112
2113 /*
2114   we think we are winning the election - send a broadcast election request
2115  */
2116 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2117 {
2118         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2119         int ret;
2120
2121         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2122         if (ret != 0) {
2123                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2124         }
2125
2126         talloc_free(rec->send_election_te);
2127         rec->send_election_te = NULL;
2128 }
2129
2130 /*
2131   handler for memory dumps
2132 */
2133 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2134                              TDB_DATA data, void *private_data)
2135 {
2136         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2137         TDB_DATA *dump;
2138         int ret;
2139         struct rd_memdump_reply *rd;
2140
2141         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2142                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2143                 talloc_free(tmp_ctx);
2144                 return;
2145         }
2146         rd = (struct rd_memdump_reply *)data.dptr;
2147
2148         dump = talloc_zero(tmp_ctx, TDB_DATA);
2149         if (dump == NULL) {
2150                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2151                 talloc_free(tmp_ctx);
2152                 return;
2153         }
2154         ret = ctdb_dump_memory(ctdb, dump);
2155         if (ret != 0) {
2156                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2157                 talloc_free(tmp_ctx);
2158                 return;
2159         }
2160
2161 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2162
2163         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2164         if (ret != 0) {
2165                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2166                 talloc_free(tmp_ctx);
2167                 return;
2168         }
2169
2170         talloc_free(tmp_ctx);
2171 }
2172
2173 /*
2174   handler for getlog
2175 */
2176 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2177                            TDB_DATA data, void *private_data)
2178 {
2179         struct ctdb_get_log_addr *log_addr;
2180         pid_t child;
2181
2182         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2183                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2184                 return;
2185         }
2186         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2187
2188         child = ctdb_fork_no_free_ringbuffer(ctdb);
2189         if (child == (pid_t)-1) {
2190                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2191                 return;
2192         }
2193
2194         if (child == 0) {
2195                 ctdb_set_process_name("ctdb_rec_log_collector");
2196                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2197                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2198                         _exit(1);
2199                 }
2200                 ctdb_collect_log(ctdb, log_addr);
2201                 _exit(0);
2202         }
2203 }
2204
2205 /*
2206   handler for clearlog
2207 */
2208 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2209                              TDB_DATA data, void *private_data)
2210 {
2211         ctdb_clear_log(ctdb);
2212 }
2213
2214 /*
2215   handler for reload_nodes
2216 */
2217 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2218                              TDB_DATA data, void *private_data)
2219 {
2220         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2221
2222         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2223
2224         reload_nodes_file(rec->ctdb);
2225 }
2226
2227
2228 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2229                               struct timeval yt, void *p)
2230 {
2231         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2232
2233         talloc_free(rec->ip_check_disable_ctx);
2234         rec->ip_check_disable_ctx = NULL;
2235 }
2236
2237
2238 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2239                                   struct timeval t, void *p)
2240 {
2241         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2242
2243         DEBUG(DEBUG_NOTICE,
2244               ("Rebalance all nodes that have had ip assignment changes.\n"));
2245
2246         do_takeover_run(rec, rec->nodemap, false);
2247
2248         talloc_free(rec->deferred_rebalance_ctx);
2249         rec->deferred_rebalance_ctx = NULL;
2250 }
2251
2252         
2253 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2254                              TDB_DATA data, void *private_data)
2255 {
2256         uint32_t pnn;
2257         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2258
2259         if (data.dsize != sizeof(uint32_t)) {
2260                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2261                 return;
2262         }
2263
2264         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2265                 return;
2266         }
2267
2268         pnn = *(uint32_t *)&data.dptr[0];
2269
2270         lcp2_forcerebalance(ctdb, pnn);
2271         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2272
2273         if (rec->deferred_rebalance_ctx != NULL) {
2274                 talloc_free(rec->deferred_rebalance_ctx);
2275         }
2276         rec->deferred_rebalance_ctx = talloc_new(rec);
2277         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2278                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2279                         ctdb_rebalance_timeout, rec);
2280 }
2281
2282
2283
2284 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2285                              TDB_DATA data, void *private_data)
2286 {
2287         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2288         struct ctdb_public_ip *ip;
2289
2290         if (rec->recmaster != rec->ctdb->pnn) {
2291                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2292                 return;
2293         }
2294
2295         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2296                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2297                 return;
2298         }
2299
2300         ip = (struct ctdb_public_ip *)data.dptr;
2301
2302         update_ip_assignment_tree(rec->ctdb, ip);
2303 }
2304
2305
2306 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2307                              TDB_DATA data, void *private_data)
2308 {
2309         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2310         uint32_t timeout;
2311
2312         if (rec->ip_check_disable_ctx != NULL) {
2313                 talloc_free(rec->ip_check_disable_ctx);
2314                 rec->ip_check_disable_ctx = NULL;
2315         }
2316
2317         if (data.dsize != sizeof(uint32_t)) {
2318                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2319                                  "expexting %lu\n", (long unsigned)data.dsize,
2320                                  (long unsigned)sizeof(uint32_t)));
2321                 return;
2322         }
2323         if (data.dptr == NULL) {
2324                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2325                 return;
2326         }
2327
2328         timeout = *((uint32_t *)data.dptr);
2329
2330         if (timeout == 0) {
2331                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2332                 return;
2333         }
2334                 
2335         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2336
2337         rec->ip_check_disable_ctx = talloc_new(rec);
2338         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2339
2340         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2341 }
2342
2343
2344 /*
2345   handler for reload all ips.
2346 */
2347 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2348                              TDB_DATA data, void *private_data)
2349 {
2350         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2351
2352         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2353                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2354                 return;
2355         }
2356
2357         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2358
2359         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2360         return;
2361 }
2362
2363 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2364 {
2365         uint32_t *status = callback_data;
2366
2367         if (res != 0) {
2368                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2369                 *status = 1;
2370         }
2371 }
2372
2373 static int
2374 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2375 {
2376         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2377         uint32_t *nodes;
2378         uint32_t status;
2379         int i;
2380
2381         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2382         for (i = 0; i< nodemap->num; i++) {
2383                 if (nodemap->nodes[i].flags != 0) {
2384                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2385                         talloc_free(tmp_ctx);
2386                         return -1;
2387                 }
2388         }
2389
2390         /* send the flags update to all connected nodes */
2391         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2392         status = 0;
2393         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2394                                         nodes, 0,
2395                                         CONTROL_TIMEOUT(),
2396                                         false, tdb_null,
2397                                         async_reloadips_callback, NULL,
2398                                         &status) != 0) {
2399                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2400                 talloc_free(tmp_ctx);
2401                 return -1;
2402         }
2403
2404         if (status != 0) {
2405                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2406                 talloc_free(tmp_ctx);
2407                 return -1;
2408         }
2409
2410         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2411
2412         talloc_free(tmp_ctx);
2413         return 0;
2414 }
2415
2416
2417 /*
2418   handler for ip reallocate, just add it to the list of callers and 
2419   handle this later in the monitor_cluster loop so we do not recurse
2420   with other callers to takeover_run()
2421 */
2422 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2423                              TDB_DATA data, void *private_data)
2424 {
2425         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2426         struct ip_reallocate_list *caller;
2427
2428         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2429                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2430                 return;
2431         }
2432
2433         if (rec->ip_reallocate_ctx == NULL) {
2434                 rec->ip_reallocate_ctx = talloc_new(rec);
2435                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2436         }
2437
2438         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2439         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2440
2441         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2442         caller->next = rec->reallocate_callers;
2443         rec->reallocate_callers = caller;
2444
2445         return;
2446 }
2447
2448 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2449 {
2450         TDB_DATA result;
2451         int32_t ret;
2452         struct ip_reallocate_list *callers;
2453         uint32_t culprit;
2454
2455         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2456
2457         /* update the list of public ips that a node can handle for
2458            all connected nodes
2459         */
2460         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2461         if (ret != 0) {
2462                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2463                                  culprit));
2464                 rec->need_takeover_run = true;
2465         }
2466         if (ret == 0) {
2467                 if (do_takeover_run(rec, rec->nodemap, false)) {
2468                         ret = 0;
2469                 } else {
2470                         ret = -1;
2471                 }
2472         }
2473
2474         result.dsize = sizeof(int32_t);
2475         result.dptr  = (uint8_t *)&ret;
2476
2477         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2478
2479                 /* Someone that sent srvid==0 does not want a reply */
2480                 if (callers->rd->srvid == 0) {
2481                         continue;
2482                 }
2483                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2484                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2485                                   (unsigned long long)callers->rd->srvid));
2486                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2487                 if (ret != 0) {
2488                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2489                                          "message to %u:%llu\n",
2490                                          (unsigned)callers->rd->pnn,
2491                                          (unsigned long long)callers->rd->srvid));
2492                 }
2493         }
2494
2495         talloc_free(rec->ip_reallocate_ctx);
2496         rec->ip_reallocate_ctx = NULL;
2497         rec->reallocate_callers = NULL;
2498 }
2499
2500
2501 /*
2502   handler for recovery master elections
2503 */
2504 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2505                              TDB_DATA data, void *private_data)
2506 {
2507         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2508         int ret;
2509         struct election_message *em = (struct election_message *)data.dptr;
2510         TALLOC_CTX *mem_ctx;
2511
2512         /* we got an election packet - update the timeout for the election */
2513         talloc_free(rec->election_timeout);
2514         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2515                                                 fast_start ?
2516                                                 timeval_current_ofs(0, 500000) :
2517                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2518                                                 ctdb_election_timeout, rec);
2519
2520         mem_ctx = talloc_new(ctdb);
2521
2522         /* someone called an election. check their election data
2523            and if we disagree and we would rather be the elected node, 
2524            send a new election message to all other nodes
2525          */
2526         if (ctdb_election_win(rec, em)) {
2527                 if (!rec->send_election_te) {
2528                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2529                                                                 timeval_current_ofs(0, 500000),
2530                                                                 election_send_request, rec);
2531                 }
2532                 talloc_free(mem_ctx);
2533                 /*unban_all_nodes(ctdb);*/
2534                 return;
2535         }
2536         
2537         /* we didn't win */
2538         talloc_free(rec->send_election_te);
2539         rec->send_election_te = NULL;
2540
2541         if (ctdb->tunable.verify_recovery_lock != 0) {
2542                 /* release the recmaster lock */
2543                 if (em->pnn != ctdb->pnn &&
2544                     ctdb->recovery_lock_fd != -1) {
2545                         close(ctdb->recovery_lock_fd);
2546                         ctdb->recovery_lock_fd = -1;
2547                         unban_all_nodes(ctdb);
2548                 }
2549         }
2550
2551         /* ok, let that guy become recmaster then */
2552         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2553         if (ret != 0) {
2554                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2555                 talloc_free(mem_ctx);
2556                 return;
2557         }
2558
2559         talloc_free(mem_ctx);
2560         return;
2561 }
2562
2563
2564 /*
2565   force the start of the election process
2566  */
2567 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2568                            struct ctdb_node_map *nodemap)
2569 {
2570         int ret;
2571         struct ctdb_context *ctdb = rec->ctdb;
2572
2573         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2574
2575         /* set all nodes to recovery mode to stop all internode traffic */
2576         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2577         if (ret != 0) {
2578                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2579                 return;
2580         }
2581
2582         talloc_free(rec->election_timeout);
2583         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2584                                                 fast_start ?
2585                                                 timeval_current_ofs(0, 500000) :
2586                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2587                                                 ctdb_election_timeout, rec);
2588
2589         ret = send_election_request(rec, pnn, true);
2590         if (ret!=0) {
2591                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2592                 return;
2593         }
2594
2595         /* wait for a few seconds to collect all responses */
2596         ctdb_wait_election(rec);
2597 }
2598
2599
2600
2601 /*
2602   handler for when a node changes its flags
2603 */
2604 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2605                             TDB_DATA data, void *private_data)
2606 {
2607         int ret;
2608         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2609         struct ctdb_node_map *nodemap=NULL;
2610         TALLOC_CTX *tmp_ctx;
2611         int i;
2612         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2613         int disabled_flag_changed;
2614
2615         if (data.dsize != sizeof(*c)) {
2616                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2617                 return;
2618         }
2619
2620         tmp_ctx = talloc_new(ctdb);
2621         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2622
2623         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2624         if (ret != 0) {
2625                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2626                 talloc_free(tmp_ctx);
2627                 return;         
2628         }
2629
2630
2631         for (i=0;i<nodemap->num;i++) {
2632                 if (nodemap->nodes[i].pnn == c->pnn) break;
2633         }
2634
2635         if (i == nodemap->num) {
2636                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2637                 talloc_free(tmp_ctx);
2638                 return;
2639         }
2640
2641         if (c->old_flags != c->new_flags) {
2642                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2643         }
2644
2645         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2646
2647         nodemap->nodes[i].flags = c->new_flags;
2648
2649         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2650                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2651
2652         if (ret == 0) {
2653                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2654                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2655         }
2656         
2657         if (ret == 0 &&
2658             ctdb->recovery_master == ctdb->pnn &&
2659             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2660                 /* Only do the takeover run if the perm disabled or unhealthy
2661                    flags changed since these will cause an ip failover but not
2662                    a recovery.
2663                    If the node became disconnected or banned this will also
2664                    lead to an ip address failover but that is handled 
2665                    during recovery
2666                 */
2667                 if (disabled_flag_changed) {
2668                         rec->need_takeover_run = true;
2669                 }
2670         }
2671
2672         talloc_free(tmp_ctx);
2673 }
2674
2675 /*
2676   handler for when we need to push out flag changes ot all other nodes
2677 */
2678 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2679                             TDB_DATA data, void *private_data)
2680 {
2681         int ret;
2682         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2683         struct ctdb_node_map *nodemap=NULL;
2684         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2685         uint32_t recmaster;
2686         uint32_t *nodes;
2687
2688         /* find the recovery master */
2689         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2690         if (ret != 0) {
2691                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2692                 talloc_free(tmp_ctx);
2693                 return;
2694         }
2695
2696         /* read the node flags from the recmaster */
2697         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2698         if (ret != 0) {
2699                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2700                 talloc_free(tmp_ctx);
2701                 return;
2702         }
2703         if (c->pnn >= nodemap->num) {
2704                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2705                 talloc_free(tmp_ctx);
2706                 return;
2707         }
2708
2709         /* send the flags update to all connected nodes */
2710         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2711
2712         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2713                                       nodes, 0, CONTROL_TIMEOUT(),
2714                                       false, data,
2715                                       NULL, NULL,
2716                                       NULL) != 0) {
2717                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2718
2719                 talloc_free(tmp_ctx);
2720                 return;
2721         }
2722
2723         talloc_free(tmp_ctx);
2724 }
2725
2726
2727 struct verify_recmode_normal_data {
2728         uint32_t count;
2729         enum monitor_result status;
2730 };
2731
2732 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2733 {
2734         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2735
2736
2737         /* one more node has responded with recmode data*/
2738         rmdata->count--;
2739
2740         /* if we failed to get the recmode, then return an error and let
2741            the main loop try again.
2742         */
2743         if (state->state != CTDB_CONTROL_DONE) {
2744                 if (rmdata->status == MONITOR_OK) {
2745                         rmdata->status = MONITOR_FAILED;
2746                 }
2747                 return;
2748         }
2749
2750         /* if we got a response, then the recmode will be stored in the
2751            status field
2752         */
2753         if (state->status != CTDB_RECOVERY_NORMAL) {
2754                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2755                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2756         }
2757
2758         return;
2759 }
2760
2761
2762 /* verify that all nodes are in normal recovery mode */
2763 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2764 {
2765         struct verify_recmode_normal_data *rmdata;
2766         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2767         struct ctdb_client_control_state *state;
2768         enum monitor_result status;
2769         int j;
2770         
2771         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2772         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2773         rmdata->count  = 0;
2774         rmdata->status = MONITOR_OK;
2775
2776         /* loop over all active nodes and send an async getrecmode call to 
2777            them*/
2778         for (j=0; j<nodemap->num; j++) {
2779                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2780                         continue;
2781                 }
2782                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2783                                         CONTROL_TIMEOUT(), 
2784                                         nodemap->nodes[j].pnn);
2785                 if (state == NULL) {
2786                         /* we failed to send the control, treat this as 
2787                            an error and try again next iteration
2788                         */                      
2789                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2790                         talloc_free(mem_ctx);
2791                         return MONITOR_FAILED;
2792                 }
2793
2794                 /* set up the callback functions */
2795                 state->async.fn = verify_recmode_normal_callback;
2796                 state->async.private_data = rmdata;
2797
2798                 /* one more control to wait for to complete */
2799                 rmdata->count++;
2800         }
2801
2802
2803         /* now wait for up to the maximum number of seconds allowed
2804            or until all nodes we expect a response from has replied
2805         */
2806         while (rmdata->count > 0) {
2807                 event_loop_once(ctdb->ev);
2808         }
2809
2810         status = rmdata->status;
2811         talloc_free(mem_ctx);
2812         return status;
2813 }
2814
2815
2816 struct verify_recmaster_data {
2817         struct ctdb_recoverd *rec;
2818         uint32_t count;
2819         uint32_t pnn;
2820         enum monitor_result status;
2821 };
2822
2823 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2824 {
2825         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2826
2827
2828         /* one more node has responded with recmaster data*/
2829         rmdata->count--;
2830
2831         /* if we failed to get the recmaster, then return an error and let
2832            the main loop try again.
2833         */
2834         if (state->state != CTDB_CONTROL_DONE) {
2835                 if (rmdata->status == MONITOR_OK) {
2836                         rmdata->status = MONITOR_FAILED;
2837                 }
2838                 return;
2839         }
2840
2841         /* if we got a response, then the recmaster will be stored in the
2842            status field
2843         */
2844         if (state->status != rmdata->pnn) {
2845                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2846                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2847                 rmdata->status = MONITOR_ELECTION_NEEDED;
2848         }
2849
2850         return;
2851 }
2852
2853
2854 /* verify that all nodes agree that we are the recmaster */
2855 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2856 {
2857         struct ctdb_context *ctdb = rec->ctdb;
2858         struct verify_recmaster_data *rmdata;
2859         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2860         struct ctdb_client_control_state *state;
2861         enum monitor_result status;
2862         int j;
2863         
2864         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2865         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2866         rmdata->rec    = rec;
2867         rmdata->count  = 0;
2868         rmdata->pnn    = pnn;
2869         rmdata->status = MONITOR_OK;
2870
2871         /* loop over all active nodes and send an async getrecmaster call to 
2872            them*/
2873         for (j=0; j<nodemap->num; j++) {
2874                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2875                         continue;
2876                 }
2877                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2878                                         CONTROL_TIMEOUT(),
2879                                         nodemap->nodes[j].pnn);
2880                 if (state == NULL) {
2881                         /* we failed to send the control, treat this as 
2882                            an error and try again next iteration
2883                         */                      
2884                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2885                         talloc_free(mem_ctx);
2886                         return MONITOR_FAILED;
2887                 }
2888
2889                 /* set up the callback functions */
2890                 state->async.fn = verify_recmaster_callback;
2891                 state->async.private_data = rmdata;
2892
2893                 /* one more control to wait for to complete */
2894                 rmdata->count++;
2895         }
2896
2897
2898         /* now wait for up to the maximum number of seconds allowed
2899            or until all nodes we expect a response from has replied
2900         */
2901         while (rmdata->count > 0) {
2902                 event_loop_once(ctdb->ev);
2903         }
2904
2905         status = rmdata->status;
2906         talloc_free(mem_ctx);
2907         return status;
2908 }
2909
2910 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2911                                     struct ctdb_recoverd *rec)
2912 {
2913         struct ctdb_control_get_ifaces *ifaces = NULL;
2914         TALLOC_CTX *mem_ctx;
2915         bool ret = false;
2916
2917         mem_ctx = talloc_new(NULL);
2918
2919         /* Read the interfaces from the local node */
2920         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2921                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2922                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2923                 /* We could return an error.  However, this will be
2924                  * rare so we'll decide that the interfaces have
2925                  * actually changed, just in case.
2926                  */
2927                 talloc_free(mem_ctx);
2928                 return true;
2929         }
2930
2931         if (!rec->ifaces) {
2932                 /* We haven't been here before so things have changed */
2933                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2934                 ret = true;
2935         } else if (rec->ifaces->num != ifaces->num) {
2936                 /* Number of interfaces has changed */
2937                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2938                                      rec->ifaces->num, ifaces->num));
2939                 ret = true;
2940         } else {
2941                 /* See if interface names or link states have changed */
2942                 int i;
2943                 for (i = 0; i < rec->ifaces->num; i++) {
2944                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2945                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2946                                 DEBUG(DEBUG_NOTICE,
2947                                       ("Interface in slot %d changed: %s => %s\n",
2948                                        i, iface->name, ifaces->ifaces[i].name));
2949                                 ret = true;
2950                                 break;
2951                         }
2952                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2953                                 DEBUG(DEBUG_NOTICE,
2954                                       ("Interface %s changed state: %d => %d\n",
2955                                        iface->name, iface->link_state,
2956                                        ifaces->ifaces[i].link_state));
2957                                 ret = true;
2958                                 break;
2959                         }
2960                 }
2961         }
2962
2963         talloc_free(rec->ifaces);
2964         rec->ifaces = talloc_steal(rec, ifaces);
2965
2966         talloc_free(mem_ctx);
2967         return ret;
2968 }
2969
2970 /* called to check that the local allocation of public ip addresses is ok.
2971 */
2972 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2973 {
2974         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2975         struct ctdb_uptime *uptime1 = NULL;
2976         struct ctdb_uptime *uptime2 = NULL;
2977         int ret, j;
2978         bool need_takeover_run = false;
2979
2980         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2981                                 CTDB_CURRENT_NODE, &uptime1);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2984                 talloc_free(mem_ctx);
2985                 return -1;
2986         }
2987
2988         if (interfaces_have_changed(ctdb, rec)) {
2989                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2990                                      "local node %u - force takeover run\n",
2991                                      pnn));
2992                 need_takeover_run = true;
2993         }
2994
2995         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2996                                 CTDB_CURRENT_NODE, &uptime2);
2997         if (ret != 0) {
2998                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2999                 talloc_free(mem_ctx);
3000                 return -1;
3001         }
3002
3003         /* skip the check if the startrecovery time has changed */
3004         if (timeval_compare(&uptime1->last_recovery_started,
3005                             &uptime2->last_recovery_started) != 0) {
3006                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3007                 talloc_free(mem_ctx);
3008                 return 0;
3009         }
3010
3011         /* skip the check if the endrecovery time has changed */
3012         if (timeval_compare(&uptime1->last_recovery_finished,
3013                             &uptime2->last_recovery_finished) != 0) {
3014                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3015                 talloc_free(mem_ctx);
3016                 return 0;
3017         }
3018
3019         /* skip the check if we have started but not finished recovery */
3020         if (timeval_compare(&uptime1->last_recovery_finished,
3021                             &uptime1->last_recovery_started) != 1) {
3022                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3023                 talloc_free(mem_ctx);
3024
3025                 return 0;
3026         }
3027
3028         /* verify that we have the ip addresses we should have
3029            and we dont have ones we shouldnt have.
3030            if we find an inconsistency we set recmode to
3031            active on the local node and wait for the recmaster
3032            to do a full blown recovery.
3033            also if the pnn is -1 and we are healthy and can host the ip
3034            we also request a ip reallocation.
3035         */
3036         if (ctdb->tunable.disable_ip_failover == 0) {
3037                 struct ctdb_all_public_ips *ips = NULL;
3038
3039                 /* read the *available* IPs from the local node */
3040                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3041                 if (ret != 0) {
3042                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3043                         talloc_free(mem_ctx);
3044                         return -1;
3045                 }
3046
3047                 for (j=0; j<ips->num; j++) {
3048                         if (ips->ips[j].pnn == -1 &&
3049                             nodemap->nodes[pnn].flags == 0) {
3050                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3051                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3052                                 need_takeover_run = true;
3053                         }
3054                 }
3055
3056                 talloc_free(ips);
3057
3058                 /* read the *known* IPs from the local node */
3059                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3060                 if (ret != 0) {
3061                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3062                         talloc_free(mem_ctx);
3063                         return -1;
3064                 }
3065
3066                 for (j=0; j<ips->num; j++) {
3067                         if (ips->ips[j].pnn == pnn) {
3068                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3069                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3070                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3071                                         need_takeover_run = true;
3072                                 }
3073                         } else {
3074                                 if (ctdb->do_checkpublicip &&
3075                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3076
3077                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3078                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3079
3080                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3081                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3082                                         }
3083                                 }
3084                         }
3085                 }
3086         }
3087
3088         if (need_takeover_run) {
3089                 struct takeover_run_reply rd;
3090                 TDB_DATA data;
3091
3092                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3093
3094                 rd.pnn = ctdb->pnn;
3095                 rd.srvid = 0;
3096                 data.dptr = (uint8_t *)&rd;
3097                 data.dsize = sizeof(rd);
3098
3099                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3100                 if (ret != 0) {
3101                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3102                 }
3103         }
3104         talloc_free(mem_ctx);
3105         return 0;
3106 }
3107
3108
3109 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3110 {
3111         struct ctdb_node_map **remote_nodemaps = callback_data;
3112
3113         if (node_pnn >= ctdb->num_nodes) {
3114                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3115                 return;
3116         }
3117
3118         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3119
3120 }
3121
3122 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3123         struct ctdb_node_map *nodemap,
3124         struct ctdb_node_map **remote_nodemaps)
3125 {
3126         uint32_t *nodes;
3127
3128         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3129         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3130                                         nodes, 0,
3131                                         CONTROL_TIMEOUT(), false, tdb_null,
3132                                         async_getnodemap_callback,
3133                                         NULL,
3134                                         remote_nodemaps) != 0) {
3135                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3136
3137                 return -1;
3138         }
3139
3140         return 0;
3141 }
3142
3143 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3144 struct ctdb_check_reclock_state {
3145         struct ctdb_context *ctdb;
3146         struct timeval start_time;
3147         int fd[2];
3148         pid_t child;
3149         struct timed_event *te;
3150         struct fd_event *fde;
3151         enum reclock_child_status status;
3152 };
3153
3154 /* when we free the reclock state we must kill any child process.
3155 */
3156 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3157 {
3158         struct ctdb_context *ctdb = state->ctdb;
3159
3160         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3161
3162         if (state->fd[0] != -1) {
3163                 close(state->fd[0]);
3164                 state->fd[0] = -1;
3165         }
3166         if (state->fd[1] != -1) {
3167                 close(state->fd[1]);
3168                 state->fd[1] = -1;
3169         }
3170         ctdb_kill(ctdb, state->child, SIGKILL);
3171         return 0;
3172 }
3173
3174 /*
3175   called if our check_reclock child times out. this would happen if
3176   i/o to the reclock file blocks.
3177  */
3178 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3179                                          struct timeval t, void *private_data)
3180 {
3181         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3182                                            struct ctdb_check_reclock_state);
3183
3184         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3185         state->status = RECLOCK_TIMEOUT;
3186 }
3187
3188 /* this is called when the child process has completed checking the reclock
3189    file and has written data back to us through the pipe.
3190 */
3191 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3192                              uint16_t flags, void *private_data)
3193 {
3194         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3195                                              struct ctdb_check_reclock_state);
3196         char c = 0;
3197         int ret;
3198
3199         /* we got a response from our child process so we can abort the
3200            timeout.
3201         */
3202         talloc_free(state->te);
3203         state->te = NULL;
3204
3205         ret = read(state->fd[0], &c, 1);
3206         if (ret != 1 || c != RECLOCK_OK) {
3207                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3208                 state->status = RECLOCK_FAILED;
3209
3210                 return;
3211         }
3212
3213         state->status = RECLOCK_OK;
3214         return;
3215 }
3216
3217 static int check_recovery_lock(struct ctdb_context *ctdb)
3218 {
3219         int ret;
3220         struct ctdb_check_reclock_state *state;
3221         pid_t parent = getpid();
3222
3223         if (ctdb->recovery_lock_fd == -1) {
3224                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3225                 return -1;
3226         }
3227
3228         state = talloc(ctdb, struct ctdb_check_reclock_state);
3229         CTDB_NO_MEMORY(ctdb, state);
3230
3231         state->ctdb = ctdb;
3232         state->start_time = timeval_current();
3233         state->status = RECLOCK_CHECKING;
3234         state->fd[0] = -1;
3235         state->fd[1] = -1;
3236
3237         ret = pipe(state->fd);
3238         if (ret != 0) {
3239                 talloc_free(state);
3240                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3241                 return -1;
3242         }
3243
3244         state->child = ctdb_fork(ctdb);
3245         if (state->child == (pid_t)-1) {
3246                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3247                 close(state->fd[0]);
3248                 state->fd[0] = -1;
3249                 close(state->fd[1]);
3250                 state->fd[1] = -1;
3251                 talloc_free(state);
3252                 return -1;
3253         }
3254
3255         if (state->child == 0) {
3256                 char cc = RECLOCK_OK;
3257                 close(state->fd[0]);
3258                 state->fd[0] = -1;
3259
3260                 ctdb_set_process_name("ctdb_rec_reclock");
3261                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3262                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3263                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3264                         cc = RECLOCK_FAILED;
3265                 }
3266
3267                 write(state->fd[1], &cc, 1);
3268                 /* make sure we die when our parent dies */
3269                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3270                         sleep(5);
3271                 }
3272                 _exit(0);
3273         }
3274         close(state->fd[1]);
3275         state->fd[1] = -1;
3276         set_close_on_exec(state->fd[0]);
3277
3278         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3279
3280         talloc_set_destructor(state, check_reclock_destructor);
3281
3282         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3283                                     ctdb_check_reclock_timeout, state);
3284         if (state->te == NULL) {
3285                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3286                 talloc_free(state);
3287                 return -1;
3288         }
3289
3290         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3291                                 EVENT_FD_READ,
3292                                 reclock_child_handler,
3293                                 (void *)state);
3294
3295         if (state->fde == NULL) {
3296                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3297                 talloc_free(state);
3298                 return -1;
3299         }
3300         tevent_fd_set_auto_close(state->fde);
3301
3302         while (state->status == RECLOCK_CHECKING) {
3303                 event_loop_once(ctdb->ev);
3304         }
3305
3306         if (state->status == RECLOCK_FAILED) {
3307                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3308                 close(ctdb->recovery_lock_fd);
3309                 ctdb->recovery_lock_fd = -1;
3310                 talloc_free(state);
3311                 return -1;
3312         }
3313
3314         talloc_free(state);
3315         return 0;
3316 }
3317
3318 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3319 {
3320         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3321         const char *reclockfile;
3322
3323         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3324                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3325                 talloc_free(tmp_ctx);
3326                 return -1;      
3327         }
3328
3329         if (reclockfile == NULL) {
3330                 if (ctdb->recovery_lock_file != NULL) {
3331                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3332                         talloc_free(ctdb->recovery_lock_file);
3333                         ctdb->recovery_lock_file = NULL;
3334                         if (ctdb->recovery_lock_fd != -1) {
3335                                 close(ctdb->recovery_lock_fd);
3336                                 ctdb->recovery_lock_fd = -1;
3337                         }
3338                 }
3339                 ctdb->tunable.verify_recovery_lock = 0;
3340                 talloc_free(tmp_ctx);
3341                 return 0;
3342         }
3343
3344         if (ctdb->recovery_lock_file == NULL) {
3345                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3346                 if (ctdb->recovery_lock_fd != -1) {
3347                         close(ctdb->recovery_lock_fd);
3348                         ctdb->recovery_lock_fd = -1;
3349                 }
3350                 talloc_free(tmp_ctx);
3351                 return 0;
3352         }
3353
3354
3355         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3356                 talloc_free(tmp_ctx);
3357                 return 0;
3358         }
3359
3360         talloc_free(ctdb->recovery_lock_file);
3361         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3362         ctdb->tunable.verify_recovery_lock = 0;
3363         if (ctdb->recovery_lock_fd != -1) {
3364                 close(ctdb->recovery_lock_fd);
3365                 ctdb->recovery_lock_fd = -1;
3366         }
3367
3368         talloc_free(tmp_ctx);
3369         return 0;
3370 }
3371
3372 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3373                       TALLOC_CTX *mem_ctx)
3374 {
3375         uint32_t pnn;
3376         struct ctdb_node_map *nodemap=NULL;
3377         struct ctdb_node_map *recmaster_nodemap=NULL;
3378         struct ctdb_node_map **remote_nodemaps=NULL;
3379         struct ctdb_vnn_map *vnnmap=NULL;
3380         struct ctdb_vnn_map *remote_vnnmap=NULL;
3381         int32_t debug_level;
3382         int i, j, ret;
3383         bool self_ban;
3384
3385
3386         /* verify that the main daemon is still running */
3387         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3388                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3389                 exit(-1);
3390         }
3391
3392         /* ping the local daemon to tell it we are alive */
3393         ctdb_ctrl_recd_ping(ctdb);
3394
3395         if (rec->election_timeout) {
3396                 /* an election is in progress */
3397                 return;
3398         }
3399
3400         /* read the debug level from the parent and update locally */
3401         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3402         if (ret !=0) {
3403                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3404                 return;
3405         }
3406         LogLevel = debug_level;
3407
3408         /* get relevant tunables */
3409         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3410         if (ret != 0) {
3411                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3412                 return;
3413         }
3414
3415         /* get the current recovery lock file from the server */
3416         if (update_recovery_lock_file(ctdb) != 0) {
3417                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3418                 return;
3419         }
3420
3421         /* Make sure that if recovery lock verification becomes disabled when
3422            we close the file
3423         */
3424         if (ctdb->tunable.verify_recovery_lock == 0) {
3425                 if (ctdb->recovery_lock_fd != -1) {
3426                         close(ctdb->recovery_lock_fd);
3427                         ctdb->recovery_lock_fd = -1;
3428                 }
3429         }
3430
3431         pnn = ctdb_get_pnn(ctdb);
3432
3433         /* get the vnnmap */
3434         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3435         if (ret != 0) {
3436                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3437                 return;
3438         }
3439
3440
3441         /* get number of nodes */
3442         if (rec->nodemap) {
3443                 talloc_free(rec->nodemap);
3444                 rec->nodemap = NULL;
3445                 nodemap=NULL;
3446         }
3447         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3448         if (ret != 0) {
3449                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3450                 return;
3451         }
3452         nodemap = rec->nodemap;
3453
3454         /* remember our own node flags */
3455         rec->node_flags = nodemap->nodes[pnn].flags;
3456
3457         ban_misbehaving_nodes(rec, &self_ban);
3458         if (self_ban) {
3459                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3460                 return;
3461         }
3462
3463         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3464            also frozen and that the recmode is set to active.
3465         */
3466         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3467                 /* If this node has become inactive then we want to
3468                  * reduce the chances of it taking over the recovery
3469                  * master role when it becomes active again.  This
3470                  * helps to stabilise the recovery master role so that
3471                  * it stays on the most stable node.
3472                  */
3473                 rec->priority_time = timeval_current();
3474
3475                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3476                 if (ret != 0) {
3477                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3478                 }
3479                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3480                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3481
3482                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3483                         if (ret != 0) {
3484                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3485                                 return;
3486                         }
3487                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3488                         if (ret != 0) {
3489                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3490
3491                                 return;
3492                         }
3493                 }
3494
3495                 /* If this node is stopped or banned then it is not the recovery
3496                  * master, so don't do anything. This prevents stopped or banned
3497                  * node from starting election and sending unnecessary controls.
3498                  */
3499                 return;
3500         }
3501
3502         /* check which node is the recovery master */
3503         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3504         if (ret != 0) {
3505                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3506                 return;
3507         }
3508
3509         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3510         if (rec->recmaster != pnn) {
3511                 if (rec->ip_reallocate_ctx != NULL) {
3512                         talloc_free(rec->ip_reallocate_ctx);
3513                         rec->ip_reallocate_ctx = NULL;
3514                         rec->reallocate_callers = NULL;
3515                 }
3516         }
3517
3518         /* This is a special case.  When recovery daemon is started, recmaster
3519          * is set to -1.  If a node is not started in stopped state, then
3520          * start election to decide recovery master
3521          */
3522         if (rec->recmaster == (uint32_t)-1) {
3523                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3524                 force_election(rec, pnn, nodemap);
3525                 return;
3526         }
3527
3528         /* update the capabilities for all nodes */
3529         ret = update_capabilities(ctdb, nodemap);
3530         if (ret != 0) {
3531                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3532                 return;
3533         }
3534
3535         /*
3536          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3537          * but we have, then force an election and try to become the new
3538          * recmaster.
3539          */
3540         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3541             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3542              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3543                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3544                                   " but we (node %u) have - force an election\n",
3545                                   rec->recmaster, pnn));
3546                 force_election(rec, pnn, nodemap);
3547                 return;
3548         }
3549
3550         /* count how many active nodes there are */
3551         rec->num_active    = 0;
3552         rec->num_connected = 0;
3553         for (i=0; i<nodemap->num; i++) {
3554                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3555                         rec->num_active++;
3556                 }
3557                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3558                         rec->num_connected++;
3559                 }
3560         }
3561
3562
3563         /* verify that the recmaster node is still active */
3564         for (j=0; j<nodemap->num; j++) {
3565                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3566                         break;
3567                 }
3568         }
3569
3570         if (j == nodemap->num) {
3571                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3572                 force_election(rec, pnn, nodemap);
3573                 return;
3574         }
3575
3576         /* if recovery master is disconnected we must elect a new recmaster */
3577         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3578                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3579                 force_election(rec, pnn, nodemap);
3580                 return;
3581         }
3582
3583         /* get nodemap from the recovery master to check if it is inactive */
3584         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3585                                    mem_ctx, &recmaster_nodemap);
3586         if (ret != 0) {
3587                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3588                           nodemap->nodes[j].pnn));
3589                 return;
3590         }
3591
3592
3593         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3594             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3595                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3596                 /*
3597                  * update our nodemap to carry the recmaster's notion of
3598                  * its own flags, so that we don't keep freezing the
3599                  * inactive recmaster node...
3600                  */
3601                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3602                 force_election(rec, pnn, nodemap);
3603                 return;
3604         }
3605
3606         /* verify that we have all ip addresses we should have and we dont
3607          * have addresses we shouldnt have.
3608          */ 
3609         if (ctdb->tunable.disable_ip_failover == 0) {
3610                 if (rec->ip_check_disable_ctx == NULL) {
3611                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3612                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3613                         }
3614                 }
3615         }
3616
3617
3618         /* if we are not the recmaster then we do not need to check
3619            if recovery is needed
3620          */
3621         if (pnn != rec->recmaster) {
3622                 return;
3623         }
3624
3625
3626         /* ensure our local copies of flags are right */
3627         ret = update_local_flags(rec, nodemap);
3628         if (ret == MONITOR_ELECTION_NEEDED) {
3629                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3630                 force_election(rec, pnn, nodemap);
3631                 return;
3632         }
3633         if (ret != MONITOR_OK) {
3634                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3635                 return;
3636         }
3637
3638         if (ctdb->num_nodes != nodemap->num) {
3639                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3640                 reload_nodes_file(ctdb);
3641                 return;
3642         }
3643
3644         /* verify that all active nodes agree that we are the recmaster */
3645         switch (verify_recmaster(rec, nodemap, pnn)) {
3646         case MONITOR_RECOVERY_NEEDED:
3647                 /* can not happen */
3648                 return;
3649         case MONITOR_ELECTION_NEEDED:
3650                 force_election(rec, pnn, nodemap);
3651                 return;
3652         case MONITOR_OK:
3653                 break;
3654         case MONITOR_FAILED:
3655                 return;
3656         }
3657
3658
3659         if (rec->need_recovery) {
3660                 /* a previous recovery didn't finish */
3661                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3662                 return;
3663         }
3664
3665         /* verify that all active nodes are in normal mode 
3666            and not in recovery mode 
3667         */
3668         switch (verify_recmode(ctdb, nodemap)) {
3669         case MONITOR_RECOVERY_NEEDED:
3670                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3671                 return;
3672         case MONITOR_FAILED:
3673                 return;
3674         case MONITOR_ELECTION_NEEDED:
3675                 /* can not happen */
3676         case MONITOR_OK:
3677                 break;
3678         }
3679
3680
3681         if (ctdb->tunable.verify_recovery_lock != 0) {
3682                 /* we should have the reclock - check its not stale */
3683                 ret = check_recovery_lock(ctdb);
3684                 if (ret != 0) {
3685                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3686                         ctdb_set_culprit(rec, ctdb->pnn);
3687                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3688                         return;
3689                 }
3690         }
3691
3692
3693         /* is there a pending reload all ips ? */
3694         if (reload_all_ips_request != NULL) {
3695                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3696                 talloc_free(reload_all_ips_request);
3697                 reload_all_ips_request = NULL;
3698         }
3699
3700         /* if there are takeovers requested, perform it and notify the waiters */
3701         if (rec->reallocate_callers) {
3702                 process_ipreallocate_requests(ctdb, rec);
3703         }
3704
3705         /* get the nodemap for all active remote nodes
3706          */
3707         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3708         if (remote_nodemaps == NULL) {
3709                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3710                 return;
3711         }
3712         for(i=0; i<nodemap->num; i++) {
3713                 remote_nodemaps[i] = NULL;
3714         }
3715         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3716                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3717                 return;
3718         } 
3719
3720         /* verify that all other nodes have the same nodemap as we have
3721         */
3722         for (j=0; j<nodemap->num; j++) {
3723                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3724                         continue;
3725                 }
3726
3727                 if (remote_nodemaps[j] == NULL) {
3728                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3729                         ctdb_set_culprit(rec, j);
3730
3731                         return;
3732                 }
3733
3734                 /* if the nodes disagree on how many nodes there are
3735                    then this is a good reason to try recovery
3736                  */
3737                 if (remote_nodemaps[j]->num != nodemap->num) {
3738                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3739                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3740                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3741                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3742                         return;
3743                 }
3744
3745                 /* if the nodes disagree on which nodes exist and are
3746                    active, then that is also a good reason to do recovery
3747                  */
3748                 for (i=0;i<nodemap->num;i++) {
3749                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3750                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3751                                           nodemap->nodes[j].pnn, i, 
3752                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3753                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3754                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3755                                             vnnmap);
3756                                 return;
3757                         }
3758                 }
3759         }
3760
3761         /*
3762          * Update node flags obtained from each active node. This ensure we have
3763          * up-to-date information for all the nodes.
3764          */
3765         for (j=0; j<nodemap->num; j++) {
3766                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3767                         continue;
3768                 }
3769                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3770         }
3771
3772         for (j=0; j<nodemap->num; j++) {
3773                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3774                         continue;
3775                 }
3776
3777                 /* verify the flags are consistent
3778                 */
3779                 for (i=0; i<nodemap->num; i++) {
3780                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3781                                 continue;
3782                         }
3783                         
3784                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3785                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3786                                   nodemap->nodes[j].pnn, 
3787                                   nodemap->nodes[i].pnn, 
3788                                   remote_nodemaps[j]->nodes[i].flags,
3789                                   nodemap->nodes[i].flags));
3790                                 if (i == j) {
3791                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3792                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3793                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3794                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3795                                                     vnnmap);
3796                                         return;
3797                                 } else {
3798                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3799                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3800                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3801                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3802                                                     vnnmap);
3803                                         return;
3804                                 }
3805                         }
3806                 }
3807         }
3808
3809
3810         /* there better be the same number of lmasters in the vnn map
3811            as there are active nodes or we will have to do a recovery
3812          */
3813         if (vnnmap->size != rec->num_active) {
3814                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3815                           vnnmap->size, rec->num_active));
3816                 ctdb_set_culprit(rec, ctdb->pnn);
3817                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3818                 return;
3819         }
3820
3821         /* verify that all active nodes in the nodemap also exist in 
3822            the vnnmap.
3823          */
3824         for (j=0; j<nodemap->num; j++) {
3825                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3826                         continue;
3827                 }
3828                 if (nodemap->nodes[j].pnn == pnn) {
3829                         continue;
3830                 }
3831
3832                 for (i=0; i<vnnmap->size; i++) {
3833                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3834                                 break;
3835                         }
3836                 }
3837                 if (i == vnnmap->size) {
3838                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3839                                   nodemap->nodes[j].pnn));
3840                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3841                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3842                         return;
3843                 }
3844         }
3845
3846         
3847         /* verify that all other nodes have the same vnnmap
3848            and are from the same generation
3849          */
3850         for (j=0; j<nodemap->num; j++) {
3851                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3852                         continue;
3853                 }
3854                 if (nodemap->nodes[j].pnn == pnn) {
3855                         continue;
3856                 }
3857
3858                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3859                                           mem_ctx, &remote_vnnmap);
3860                 if (ret != 0) {
3861                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3862                                   nodemap->nodes[j].pnn));
3863                         return;
3864                 }
3865
3866                 /* verify the vnnmap generation is the same */
3867                 if (vnnmap->generation != remote_vnnmap->generation) {
3868                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3869                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3870                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3871                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3872                         return;
3873                 }
3874
3875                 /* verify the vnnmap size is the same */
3876                 if (vnnmap->size != remote_vnnmap->size) {
3877                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3878                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3879                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3880                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3881                         return;
3882                 }
3883
3884                 /* verify the vnnmap is the same */
3885                 for (i=0;i<vnnmap->size;i++) {
3886                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3887                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3888                                           nodemap->nodes[j].pnn));
3889                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3890                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3891                                             vnnmap);
3892                                 return;
3893                         }
3894                 }
3895         }
3896
3897         /* we might need to change who has what IP assigned */
3898         if (rec->need_takeover_run) {
3899                 uint32_t culprit = (uint32_t)-1;
3900
3901                 rec->need_takeover_run = false;
3902
3903                 /* update the list of public ips that a node can handle for
3904                    all connected nodes
3905                 */
3906                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3907                 if (ret != 0) {
3908                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3909                                          culprit));
3910                         rec->need_takeover_run = true;
3911                         return;
3912                 }
3913
3914                 /* execute the "startrecovery" event script on all nodes */
3915                 ret = run_startrecovery_eventscript(rec, nodemap);
3916                 if (ret!=0) {
3917                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3918                         ctdb_set_culprit(rec, ctdb->pnn);
3919                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3920                         return;
3921                 }
3922
3923                 /* If takeover run fails, then the offending nodes are
3924                  * assigned ban culprit counts. And we re-try takeover.
3925                  * If takeover run fails repeatedly, the node would get
3926                  * banned.
3927                  *
3928                  * If rec->need_takeover_run is not set to true at this
3929                  * failure, monitoring is disabled cluster-wide (via
3930                  * startrecovery eventscript) and will not get enabled.
3931                  */
3932                 if (!do_takeover_run(rec, nodemap, true)) {
3933                         return;
3934                 }
3935
3936                 /* execute the "recovered" event script on all nodes */
3937                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3938 #if 0
3939 // we cant check whether the event completed successfully
3940 // since this script WILL fail if the node is in recovery mode
3941 // and if that race happens, the code here would just cause a second
3942 // cascading recovery.
3943                 if (ret!=0) {
3944                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3945                         ctdb_set_culprit(rec, ctdb->pnn);
3946                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3947                 }
3948 #endif
3949         }
3950 }
3951
3952 /*
3953   the main monitoring loop
3954  */
3955 static void monitor_cluster(struct ctdb_context *ctdb)
3956 {
3957         struct ctdb_recoverd *rec;
3958
3959         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3960
3961         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3962         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3963
3964         rec->ctdb = ctdb;
3965
3966         rec->takeover_run_in_progress = false;
3967
3968         rec->priority_time = timeval_current();
3969
3970         /* register a message port for sending memory dumps */
3971         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3972
3973         /* register a message port for requesting logs */
3974         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3975
3976         /* register a message port for clearing logs */
3977         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3978
3979         /* register a message port for recovery elections */
3980         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3981
3982         /* when nodes are disabled/enabled */
3983         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3984
3985         /* when we are asked to puch out a flag change */
3986         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3987
3988         /* register a message port for vacuum fetch */
3989         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3990
3991         /* register a message port for reloadnodes  */
3992         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3993
3994         /* register a message port for performing a takeover run */
3995         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3996
3997         /* register a message port for performing a reload all ips */
3998         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3999
4000         /* register a message port for disabling the ip check for a short while */
4001         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4002
4003         /* register a message port for updating the recovery daemons node assignment for an ip */
4004         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4005
4006         /* register a message port for forcing a rebalance of a node next
4007            reallocation */
4008         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4009
4010         for (;;) {
4011                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4012                 struct timeval start;
4013                 double elapsed;
4014
4015                 if (!mem_ctx) {
4016                         DEBUG(DEBUG_CRIT,(__location__
4017                                           " Failed to create temp context\n"));
4018                         exit(-1);
4019                 }
4020
4021                 start = timeval_current();
4022                 main_loop(ctdb, rec, mem_ctx);
4023                 talloc_free(mem_ctx);
4024
4025                 /* we only check for recovery once every second */
4026                 elapsed = timeval_elapsed(&start);
4027                 if (elapsed < ctdb->tunable.recover_interval) {
4028                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4029                                           - elapsed);
4030                 }
4031         }
4032 }
4033
4034 /*
4035   event handler for when the main ctdbd dies
4036  */
4037 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4038                                  uint16_t flags, void *private_data)
4039 {
4040         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4041         _exit(1);
4042 }
4043
4044 /*
4045   called regularly to verify that the recovery daemon is still running
4046  */
4047 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4048                               struct timeval yt, void *p)
4049 {
4050         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4051
4052         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4053                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4054
4055                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4056                                 ctdb_restart_recd, ctdb);
4057
4058                 return;
4059         }
4060
4061         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4062                         timeval_current_ofs(30, 0),
4063                         ctdb_check_recd, ctdb);
4064 }
4065
4066 static void recd_sig_child_handler(struct event_context *ev,
4067         struct signal_event *se, int signum, int count,
4068         void *dont_care, 
4069         void *private_data)
4070 {
4071 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4072         int status;
4073         pid_t pid = -1;
4074
4075         while (pid != 0) {
4076                 pid = waitpid(-1, &status, WNOHANG);
4077                 if (pid == -1) {
4078                         if (errno != ECHILD) {
4079                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4080                         }
4081                         return;
4082                 }
4083                 if (pid > 0) {
4084                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4085                 }
4086         }
4087 }
4088
4089 /*
4090   startup the recovery daemon as a child of the main ctdb daemon
4091  */
4092 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4093 {
4094         int fd[2];
4095         struct signal_event *se;
4096         struct tevent_fd *fde;
4097
4098         if (pipe(fd) != 0) {
4099                 return -1;
4100         }
4101
4102         ctdb->ctdbd_pid = getpid();
4103
4104         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4105         if (ctdb->recoverd_pid == -1) {
4106                 return -1;
4107         }
4108
4109         if (ctdb->recoverd_pid != 0) {
4110                 talloc_free(ctdb->recd_ctx);
4111                 ctdb->recd_ctx = talloc_new(ctdb);
4112                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4113
4114                 close(fd[0]);
4115                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4116                                 timeval_current_ofs(30, 0),
4117                                 ctdb_check_recd, ctdb);
4118                 return 0;
4119         }
4120
4121         close(fd[1]);
4122
4123         srandom(getpid() ^ time(NULL));
4124
4125         /* Clear the log ringbuffer */
4126         ctdb_clear_log(ctdb);
4127
4128         ctdb_set_process_name("ctdb_recovered");
4129         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4130                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4131                 exit(1);
4132         }
4133
4134         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4135
4136         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4137                      ctdb_recoverd_parent, &fd[0]);
4138         tevent_fd_set_auto_close(fde);
4139
4140         /* set up a handler to pick up sigchld */
4141         se = event_add_signal(ctdb->ev, ctdb,
4142                                      SIGCHLD, 0,
4143                                      recd_sig_child_handler,
4144                                      ctdb);
4145         if (se == NULL) {
4146                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4147                 exit(1);
4148         }
4149
4150         monitor_cluster(ctdb);
4151
4152         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4153         return -1;
4154 }
4155
4156 /*
4157   shutdown the recovery daemon
4158  */
4159 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4160 {
4161         if (ctdb->recoverd_pid == 0) {
4162                 return;
4163         }
4164
4165         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4166         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4167
4168         TALLOC_FREE(ctdb->recd_ctx);
4169         TALLOC_FREE(ctdb->recd_ping_count);
4170 }
4171
4172 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4173                        struct timeval t, void *private_data)
4174 {
4175         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4176
4177         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4178         ctdb_stop_recoverd(ctdb);
4179         ctdb_start_recoverd(ctdb);
4180 }