recoverd: Move IP flags into ctdb_takeover.c
[obnox/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
90
91         if (!ctdb_validate_pnn(ctdb, pnn)) {
92                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
93                 return;
94         }
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         if (ctdb->nodes[culprit]->ban_state == NULL) {
124                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
125                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
126
127                 
128         }
129         ban_state = ctdb->nodes[culprit]->ban_state;
130         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
131                 /* this was the first time in a long while this node
132                    misbehaved so we will forgive any old transgressions.
133                 */
134                 ban_state->count = 0;
135         }
136
137         ban_state->count += count;
138         ban_state->last_reported_time = timeval_current();
139         rec->last_culprit_node = culprit;
140 }
141
142 /*
143   remember the trouble maker
144  */
145 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
146 {
147         ctdb_set_culprit_count(rec, culprit, 1);
148 }
149
150
151 /* this callback is called for every node that failed to execute the
152    recovered event
153 */
154 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
155 {
156         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
157
158         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
159
160         ctdb_set_culprit(rec, node_pnn);
161 }
162
163 /*
164   run the "recovered" eventscript on all nodes
165  */
166 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
167 {
168         TALLOC_CTX *tmp_ctx;
169         uint32_t *nodes;
170         struct ctdb_context *ctdb = rec->ctdb;
171
172         tmp_ctx = talloc_new(ctdb);
173         CTDB_NO_MEMORY(ctdb, tmp_ctx);
174
175         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
176         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
177                                         nodes, 0,
178                                         CONTROL_TIMEOUT(), false, tdb_null,
179                                         NULL, recovered_fail_callback,
180                                         rec) != 0) {
181                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
182
183                 talloc_free(tmp_ctx);
184                 return -1;
185         }
186
187         talloc_free(tmp_ctx);
188         return 0;
189 }
190
191 /* this callback is called for every node that failed to execute the
192    start recovery event
193 */
194 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
195 {
196         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
197
198         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
199
200         ctdb_set_culprit(rec, node_pnn);
201 }
202
203 /*
204   run the "startrecovery" eventscript on all nodes
205  */
206 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
207 {
208         TALLOC_CTX *tmp_ctx;
209         uint32_t *nodes;
210         struct ctdb_context *ctdb = rec->ctdb;
211
212         tmp_ctx = talloc_new(ctdb);
213         CTDB_NO_MEMORY(ctdb, tmp_ctx);
214
215         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
216         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
217                                         nodes, 0,
218                                         CONTROL_TIMEOUT(), false, tdb_null,
219                                         NULL,
220                                         startrecovery_fail_callback,
221                                         rec) != 0) {
222                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
223                 talloc_free(tmp_ctx);
224                 return -1;
225         }
226
227         talloc_free(tmp_ctx);
228         return 0;
229 }
230
231 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
232 {
233         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
234                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
235                 return;
236         }
237         if (node_pnn < ctdb->num_nodes) {
238                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
239         }
240
241         if (node_pnn == ctdb->pnn) {
242                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
243         }
244 }
245
246 /*
247   update the node capabilities for all connected nodes
248  */
249 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
250 {
251         uint32_t *nodes;
252         TALLOC_CTX *tmp_ctx;
253
254         tmp_ctx = talloc_new(ctdb);
255         CTDB_NO_MEMORY(ctdb, tmp_ctx);
256
257         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
258         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
259                                         nodes, 0,
260                                         CONTROL_TIMEOUT(),
261                                         false, tdb_null,
262                                         async_getcap_callback, NULL,
263                                         NULL) != 0) {
264                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
265                 talloc_free(tmp_ctx);
266                 return -1;
267         }
268
269         talloc_free(tmp_ctx);
270         return 0;
271 }
272
273 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
274 {
275         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
276
277         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
278         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
279 }
280
281 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
282 {
283         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
284
285         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
286         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
287 }
288
289 /*
290   change recovery mode on all nodes
291  */
292 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
293 {
294         TDB_DATA data;
295         uint32_t *nodes;
296         TALLOC_CTX *tmp_ctx;
297
298         tmp_ctx = talloc_new(ctdb);
299         CTDB_NO_MEMORY(ctdb, tmp_ctx);
300
301         /* freeze all nodes */
302         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
303         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
304                 int i;
305
306                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
307                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
308                                                 nodes, i,
309                                                 CONTROL_TIMEOUT(),
310                                                 false, tdb_null,
311                                                 NULL,
312                                                 set_recmode_fail_callback,
313                                                 rec) != 0) {
314                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
315                                 talloc_free(tmp_ctx);
316                                 return -1;
317                         }
318                 }
319         }
320
321
322         data.dsize = sizeof(uint32_t);
323         data.dptr = (unsigned char *)&rec_mode;
324
325         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
326                                         nodes, 0,
327                                         CONTROL_TIMEOUT(),
328                                         false, data,
329                                         NULL, NULL,
330                                         NULL) != 0) {
331                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
332                 talloc_free(tmp_ctx);
333                 return -1;
334         }
335
336         talloc_free(tmp_ctx);
337         return 0;
338 }
339
340 /*
341   change recovery master on all node
342  */
343 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
344 {
345         TDB_DATA data;
346         TALLOC_CTX *tmp_ctx;
347         uint32_t *nodes;
348
349         tmp_ctx = talloc_new(ctdb);
350         CTDB_NO_MEMORY(ctdb, tmp_ctx);
351
352         data.dsize = sizeof(uint32_t);
353         data.dptr = (unsigned char *)&pnn;
354
355         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
356         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
357                                         nodes, 0,
358                                         CONTROL_TIMEOUT(), false, data,
359                                         NULL, NULL,
360                                         NULL) != 0) {
361                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
362                 talloc_free(tmp_ctx);
363                 return -1;
364         }
365
366         talloc_free(tmp_ctx);
367         return 0;
368 }
369
370 /* update all remote nodes to use the same db priority that we have
371    this can fail if the remove node has not yet been upgraded to 
372    support this function, so we always return success and never fail
373    a recovery if this call fails.
374 */
375 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
376         struct ctdb_node_map *nodemap, 
377         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
378 {
379         int db;
380         uint32_t *nodes;
381
382         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
383
384         /* step through all local databases */
385         for (db=0; db<dbmap->num;db++) {
386                 TDB_DATA data;
387                 struct ctdb_db_priority db_prio;
388                 int ret;
389
390                 db_prio.db_id     = dbmap->dbs[db].dbid;
391                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
392                 if (ret != 0) {
393                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
394                         continue;
395                 }
396
397                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
398
399                 data.dptr  = (uint8_t *)&db_prio;
400                 data.dsize = sizeof(db_prio);
401
402                 if (ctdb_client_async_control(ctdb,
403                                         CTDB_CONTROL_SET_DB_PRIORITY,
404                                         nodes, 0,
405                                         CONTROL_TIMEOUT(), false, data,
406                                         NULL, NULL,
407                                         NULL) != 0) {
408                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
409                 }
410         }
411
412         return 0;
413 }                       
414
415 /*
416   ensure all other nodes have attached to any databases that we have
417  */
418 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
419                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
420 {
421         int i, j, db, ret;
422         struct ctdb_dbid_map *remote_dbmap;
423
424         /* verify that all other nodes have all our databases */
425         for (j=0; j<nodemap->num; j++) {
426                 /* we dont need to ourself ourselves */
427                 if (nodemap->nodes[j].pnn == pnn) {
428                         continue;
429                 }
430                 /* dont check nodes that are unavailable */
431                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
432                         continue;
433                 }
434
435                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
436                                          mem_ctx, &remote_dbmap);
437                 if (ret != 0) {
438                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
439                         return -1;
440                 }
441
442                 /* step through all local databases */
443                 for (db=0; db<dbmap->num;db++) {
444                         const char *name;
445
446
447                         for (i=0;i<remote_dbmap->num;i++) {
448                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
449                                         break;
450                                 }
451                         }
452                         /* the remote node already have this database */
453                         if (i!=remote_dbmap->num) {
454                                 continue;
455                         }
456                         /* ok so we need to create this database */
457                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
458                                             mem_ctx, &name);
459                         if (ret != 0) {
460                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
461                                 return -1;
462                         }
463                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
464                                            mem_ctx, name,
465                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
466                         if (ret != 0) {
467                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
468                                 return -1;
469                         }
470                 }
471         }
472
473         return 0;
474 }
475
476
477 /*
478   ensure we are attached to any databases that anyone else is attached to
479  */
480 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
481                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
482 {
483         int i, j, db, ret;
484         struct ctdb_dbid_map *remote_dbmap;
485
486         /* verify that we have all database any other node has */
487         for (j=0; j<nodemap->num; j++) {
488                 /* we dont need to ourself ourselves */
489                 if (nodemap->nodes[j].pnn == pnn) {
490                         continue;
491                 }
492                 /* dont check nodes that are unavailable */
493                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
494                         continue;
495                 }
496
497                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                          mem_ctx, &remote_dbmap);
499                 if (ret != 0) {
500                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
501                         return -1;
502                 }
503
504                 /* step through all databases on the remote node */
505                 for (db=0; db<remote_dbmap->num;db++) {
506                         const char *name;
507
508                         for (i=0;i<(*dbmap)->num;i++) {
509                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
510                                         break;
511                                 }
512                         }
513                         /* we already have this db locally */
514                         if (i!=(*dbmap)->num) {
515                                 continue;
516                         }
517                         /* ok so we need to create this database and
518                            rebuild dbmap
519                          */
520                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
521                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
522                         if (ret != 0) {
523                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
524                                           nodemap->nodes[j].pnn));
525                                 return -1;
526                         }
527                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
528                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
529                         if (ret != 0) {
530                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
531                                 return -1;
532                         }
533                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
534                         if (ret != 0) {
535                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
536                                 return -1;
537                         }
538                 }
539         }
540
541         return 0;
542 }
543
544
545 /*
546   pull the remote database contents from one node into the recdb
547  */
548 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
549                                     struct tdb_wrap *recdb, uint32_t dbid)
550 {
551         int ret;
552         TDB_DATA outdata;
553         struct ctdb_marshall_buffer *reply;
554         struct ctdb_rec_data *rec;
555         int i;
556         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
557
558         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
559                                CONTROL_TIMEOUT(), &outdata);
560         if (ret != 0) {
561                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
562                 talloc_free(tmp_ctx);
563                 return -1;
564         }
565
566         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
567
568         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
569                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
570                 talloc_free(tmp_ctx);
571                 return -1;
572         }
573         
574         rec = (struct ctdb_rec_data *)&reply->data[0];
575         
576         for (i=0;
577              i<reply->count;
578              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
579                 TDB_DATA key, data;
580                 struct ctdb_ltdb_header *hdr;
581                 TDB_DATA existing;
582                 
583                 key.dptr = &rec->data[0];
584                 key.dsize = rec->keylen;
585                 data.dptr = &rec->data[key.dsize];
586                 data.dsize = rec->datalen;
587                 
588                 hdr = (struct ctdb_ltdb_header *)data.dptr;
589
590                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
591                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
592                         talloc_free(tmp_ctx);
593                         return -1;
594                 }
595
596                 /* fetch the existing record, if any */
597                 existing = tdb_fetch(recdb->tdb, key);
598                 
599                 if (existing.dptr != NULL) {
600                         struct ctdb_ltdb_header header;
601                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
602                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
603                                          (unsigned)existing.dsize, srcnode));
604                                 free(existing.dptr);
605                                 talloc_free(tmp_ctx);
606                                 return -1;
607                         }
608                         header = *(struct ctdb_ltdb_header *)existing.dptr;
609                         free(existing.dptr);
610                         if (!(header.rsn < hdr->rsn ||
611                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
612                                 continue;
613                         }
614                 }
615                 
616                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
617                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
618                         talloc_free(tmp_ctx);
619                         return -1;                              
620                 }
621         }
622
623         talloc_free(tmp_ctx);
624
625         return 0;
626 }
627
628
629 struct pull_seqnum_cbdata {
630         int failed;
631         uint32_t pnn;
632         uint64_t seqnum;
633 };
634
635 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
636 {
637         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
638         uint64_t seqnum;
639
640         if (cb_data->failed != 0) {
641                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
642                 return;
643         }
644
645         if (res != 0) {
646                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
647                 cb_data->failed = 1;
648                 return;
649         }
650
651         if (outdata.dsize != sizeof(uint64_t)) {
652                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
653                 cb_data->failed = -1;
654                 return;
655         }
656
657         seqnum = *((uint64_t *)outdata.dptr);
658
659         if (seqnum > cb_data->seqnum) {
660                 cb_data->seqnum = seqnum;
661                 cb_data->pnn = node_pnn;
662         }
663 }
664
665 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
666 {
667         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
668
669         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
670         cb_data->failed = 1;
671 }
672
673 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
674                                 struct ctdb_recoverd *rec, 
675                                 struct ctdb_node_map *nodemap, 
676                                 struct tdb_wrap *recdb, uint32_t dbid)
677 {
678         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
679         uint32_t *nodes;
680         TDB_DATA data;
681         uint32_t outdata[2];
682         struct pull_seqnum_cbdata *cb_data;
683
684         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
685
686         outdata[0] = dbid;
687         outdata[1] = 0;
688
689         data.dsize = sizeof(outdata);
690         data.dptr  = (uint8_t *)&outdata[0];
691
692         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
693         if (cb_data == NULL) {
694                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
695                 talloc_free(tmp_ctx);
696                 return -1;
697         }
698
699         cb_data->failed = 0;
700         cb_data->pnn    = -1;
701         cb_data->seqnum = 0;
702         
703         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
704         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
705                                         nodes, 0,
706                                         CONTROL_TIMEOUT(), false, data,
707                                         pull_seqnum_cb,
708                                         pull_seqnum_fail_cb,
709                                         cb_data) != 0) {
710                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
711
712                 talloc_free(tmp_ctx);
713                 return -1;
714         }
715
716         if (cb_data->failed != 0) {
717                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
729
730         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
731                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
732                 talloc_free(tmp_ctx);
733                 return -1;
734         }
735
736         talloc_free(tmp_ctx);
737         return 0;
738 }
739
740
741 /*
742   pull all the remote database contents into the recdb
743  */
744 static int pull_remote_database(struct ctdb_context *ctdb,
745                                 struct ctdb_recoverd *rec, 
746                                 struct ctdb_node_map *nodemap, 
747                                 struct tdb_wrap *recdb, uint32_t dbid,
748                                 bool persistent)
749 {
750         int j;
751
752         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
753                 int ret;
754                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
755                 if (ret == 0) {
756                         return 0;
757                 }
758         }
759
760         /* pull all records from all other nodes across onto this node
761            (this merges based on rsn)
762         */
763         for (j=0; j<nodemap->num; j++) {
764                 /* dont merge from nodes that are unavailable */
765                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
766                         continue;
767                 }
768                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
769                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
770                                  nodemap->nodes[j].pnn));
771                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
772                         return -1;
773                 }
774         }
775         
776         return 0;
777 }
778
779
780 /*
781   update flags on all active nodes
782  */
783 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
784 {
785         int ret;
786
787         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
788                 if (ret != 0) {
789                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
790                 return -1;
791         }
792
793         return 0;
794 }
795
796 /*
797   ensure all nodes have the same vnnmap we do
798  */
799 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
800                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
801 {
802         int j, ret;
803
804         /* push the new vnn map out to all the nodes */
805         for (j=0; j<nodemap->num; j++) {
806                 /* dont push to nodes that are unavailable */
807                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
808                         continue;
809                 }
810
811                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
812                 if (ret != 0) {
813                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
814                         return -1;
815                 }
816         }
817
818         return 0;
819 }
820
821
822 struct vacuum_info {
823         struct vacuum_info *next, *prev;
824         struct ctdb_recoverd *rec;
825         uint32_t srcnode;
826         struct ctdb_db_context *ctdb_db;
827         struct ctdb_marshall_buffer *recs;
828         struct ctdb_rec_data *r;
829 };
830
831 static void vacuum_fetch_next(struct vacuum_info *v);
832
833 /*
834   called when a vacuum fetch has completed - just free it and do the next one
835  */
836 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
837 {
838         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
839         talloc_free(state);
840         vacuum_fetch_next(v);
841 }
842
843
844 /*
845   process the next element from the vacuum list
846 */
847 static void vacuum_fetch_next(struct vacuum_info *v)
848 {
849         struct ctdb_call call;
850         struct ctdb_rec_data *r;
851
852         while (v->recs->count) {
853                 struct ctdb_client_call_state *state;
854                 TDB_DATA data;
855                 struct ctdb_ltdb_header *hdr;
856
857                 ZERO_STRUCT(call);
858                 call.call_id = CTDB_NULL_FUNC;
859                 call.flags = CTDB_IMMEDIATE_MIGRATION;
860                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
861
862                 r = v->r;
863                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
864                 v->recs->count--;
865
866                 call.key.dptr = &r->data[0];
867                 call.key.dsize = r->keylen;
868
869                 /* ensure we don't block this daemon - just skip a record if we can't get
870                    the chainlock */
871                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
872                         continue;
873                 }
874
875                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
876                 if (data.dptr == NULL) {
877                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
878                         continue;
879                 }
880
881                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
882                         free(data.dptr);
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886                 
887                 hdr = (struct ctdb_ltdb_header *)data.dptr;
888                 if (hdr->dmaster == v->rec->ctdb->pnn) {
889                         /* its already local */
890                         free(data.dptr);
891                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
892                         continue;
893                 }
894
895                 free(data.dptr);
896
897                 state = ctdb_call_send(v->ctdb_db, &call);
898                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
899                 if (state == NULL) {
900                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
901                         talloc_free(v);
902                         return;
903                 }
904                 state->async.fn = vacuum_fetch_callback;
905                 state->async.private_data = v;
906                 return;
907         }
908
909         talloc_free(v);
910 }
911
912
913 /*
914   destroy a vacuum info structure
915  */
916 static int vacuum_info_destructor(struct vacuum_info *v)
917 {
918         DLIST_REMOVE(v->rec->vacuum_info, v);
919         return 0;
920 }
921
922
923 /*
924   handler for vacuum fetch
925 */
926 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
927                                  TDB_DATA data, void *private_data)
928 {
929         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
930         struct ctdb_marshall_buffer *recs;
931         int ret, i;
932         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
933         const char *name;
934         struct ctdb_dbid_map *dbmap=NULL;
935         bool persistent = false;
936         struct ctdb_db_context *ctdb_db;
937         struct ctdb_rec_data *r;
938         uint32_t srcnode;
939         struct vacuum_info *v;
940
941         recs = (struct ctdb_marshall_buffer *)data.dptr;
942         r = (struct ctdb_rec_data *)&recs->data[0];
943
944         if (recs->count == 0) {
945                 talloc_free(tmp_ctx);
946                 return;
947         }
948
949         srcnode = r->reqid;
950
951         for (v=rec->vacuum_info;v;v=v->next) {
952                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
953                         /* we're already working on records from this node */
954                         talloc_free(tmp_ctx);
955                         return;
956                 }
957         }
958
959         /* work out if the database is persistent */
960         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
961         if (ret != 0) {
962                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
963                 talloc_free(tmp_ctx);
964                 return;
965         }
966
967         for (i=0;i<dbmap->num;i++) {
968                 if (dbmap->dbs[i].dbid == recs->db_id) {
969                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
970                         break;
971                 }
972         }
973         if (i == dbmap->num) {
974                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
975                 talloc_free(tmp_ctx);
976                 return;         
977         }
978
979         /* find the name of this database */
980         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
981                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
982                 talloc_free(tmp_ctx);
983                 return;
984         }
985
986         /* attach to it */
987         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
988         if (ctdb_db == NULL) {
989                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
990                 talloc_free(tmp_ctx);
991                 return;
992         }
993
994         v = talloc_zero(rec, struct vacuum_info);
995         if (v == NULL) {
996                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
997                 talloc_free(tmp_ctx);
998                 return;
999         }
1000
1001         v->rec = rec;
1002         v->srcnode = srcnode;
1003         v->ctdb_db = ctdb_db;
1004         v->recs = talloc_memdup(v, recs, data.dsize);
1005         if (v->recs == NULL) {
1006                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1007                 talloc_free(v);
1008                 talloc_free(tmp_ctx);
1009                 return;         
1010         }
1011         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1012
1013         DLIST_ADD(rec->vacuum_info, v);
1014
1015         talloc_set_destructor(v, vacuum_info_destructor);
1016
1017         vacuum_fetch_next(v);
1018         talloc_free(tmp_ctx);
1019 }
1020
1021
1022 /*
1023   called when ctdb_wait_timeout should finish
1024  */
1025 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1026                               struct timeval yt, void *p)
1027 {
1028         uint32_t *timed_out = (uint32_t *)p;
1029         (*timed_out) = 1;
1030 }
1031
1032 /*
1033   wait for a given number of seconds
1034  */
1035 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1036 {
1037         uint32_t timed_out = 0;
1038         time_t usecs = (secs - (time_t)secs) * 1000000;
1039         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1040         while (!timed_out) {
1041                 event_loop_once(ctdb->ev);
1042         }
1043 }
1044
1045 /*
1046   called when an election times out (ends)
1047  */
1048 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1049                                   struct timeval t, void *p)
1050 {
1051         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1052         rec->election_timeout = NULL;
1053         fast_start = false;
1054
1055         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1056 }
1057
1058
1059 /*
1060   wait for an election to finish. It finished election_timeout seconds after
1061   the last election packet is received
1062  */
1063 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1064 {
1065         struct ctdb_context *ctdb = rec->ctdb;
1066         while (rec->election_timeout) {
1067                 event_loop_once(ctdb->ev);
1068         }
1069 }
1070
1071 /*
1072   Update our local flags from all remote connected nodes. 
1073   This is only run when we are or we belive we are the recovery master
1074  */
1075 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1076 {
1077         int j;
1078         struct ctdb_context *ctdb = rec->ctdb;
1079         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1080
1081         /* get the nodemap for all active remote nodes and verify
1082            they are the same as for this node
1083          */
1084         for (j=0; j<nodemap->num; j++) {
1085                 struct ctdb_node_map *remote_nodemap=NULL;
1086                 int ret;
1087
1088                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1089                         continue;
1090                 }
1091                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1092                         continue;
1093                 }
1094
1095                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1096                                            mem_ctx, &remote_nodemap);
1097                 if (ret != 0) {
1098                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1099                                   nodemap->nodes[j].pnn));
1100                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1101                         talloc_free(mem_ctx);
1102                         return MONITOR_FAILED;
1103                 }
1104                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1105                         /* We should tell our daemon about this so it
1106                            updates its flags or else we will log the same 
1107                            message again in the next iteration of recovery.
1108                            Since we are the recovery master we can just as
1109                            well update the flags on all nodes.
1110                         */
1111                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1112                         if (ret != 0) {
1113                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1114                                 return -1;
1115                         }
1116
1117                         /* Update our local copy of the flags in the recovery
1118                            daemon.
1119                         */
1120                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1121                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1122                                  nodemap->nodes[j].flags));
1123                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1124                 }
1125                 talloc_free(remote_nodemap);
1126         }
1127         talloc_free(mem_ctx);
1128         return MONITOR_OK;
1129 }
1130
1131
1132 /* Create a new random generation ip. 
1133    The generation id can not be the INVALID_GENERATION id
1134 */
1135 static uint32_t new_generation(void)
1136 {
1137         uint32_t generation;
1138
1139         while (1) {
1140                 generation = random();
1141
1142                 if (generation != INVALID_GENERATION) {
1143                         break;
1144                 }
1145         }
1146
1147         return generation;
1148 }
1149
1150
1151 /*
1152   create a temporary working database
1153  */
1154 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1155 {
1156         char *name;
1157         struct tdb_wrap *recdb;
1158         unsigned tdb_flags;
1159
1160         /* open up the temporary recovery database */
1161         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1162                                ctdb->db_directory_state,
1163                                ctdb->pnn);
1164         if (name == NULL) {
1165                 return NULL;
1166         }
1167         unlink(name);
1168
1169         tdb_flags = TDB_NOLOCK;
1170         if (ctdb->valgrinding) {
1171                 tdb_flags |= TDB_NOMMAP;
1172         }
1173         tdb_flags |= TDB_DISALLOW_NESTING;
1174
1175         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1176                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1177         if (recdb == NULL) {
1178                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1179         }
1180
1181         talloc_free(name);
1182
1183         return recdb;
1184 }
1185
1186
1187 /* 
1188    a traverse function for pulling all relevant records from recdb
1189  */
1190 struct recdb_data {
1191         struct ctdb_context *ctdb;
1192         struct ctdb_marshall_buffer *recdata;
1193         uint32_t len;
1194         uint32_t allocated_len;
1195         bool failed;
1196         bool persistent;
1197 };
1198
1199 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1200 {
1201         struct recdb_data *params = (struct recdb_data *)p;
1202         struct ctdb_rec_data *rec;
1203         struct ctdb_ltdb_header *hdr;
1204
1205         /*
1206          * skip empty records - but NOT for persistent databases:
1207          *
1208          * The record-by-record mode of recovery deletes empty records.
1209          * For persistent databases, this can lead to data corruption
1210          * by deleting records that should be there:
1211          *
1212          * - Assume the cluster has been running for a while.
1213          *
1214          * - A record R in a persistent database has been created and
1215          *   deleted a couple of times, the last operation being deletion,
1216          *   leaving an empty record with a high RSN, say 10.
1217          *
1218          * - Now a node N is turned off.
1219          *
1220          * - This leaves the local database copy of D on N with the empty
1221          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1222          *   the copy of record R.
1223          *
1224          * - Now the record is created again while node N is turned off.
1225          *   This creates R with RSN = 1 on all nodes except for N.
1226          *
1227          * - Now node N is turned on again. The following recovery will chose
1228          *   the older empty copy of R due to RSN 10 > RSN 1.
1229          *
1230          * ==> Hence the record is gone after the recovery.
1231          *
1232          * On databases like Samba's registry, this can damage the higher-level
1233          * data structures built from the various tdb-level records.
1234          */
1235         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1236                 return 0;
1237         }
1238
1239         /* update the dmaster field to point to us */
1240         hdr = (struct ctdb_ltdb_header *)data.dptr;
1241         if (!params->persistent) {
1242                 hdr->dmaster = params->ctdb->pnn;
1243                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1244         }
1245
1246         /* add the record to the blob ready to send to the nodes */
1247         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1248         if (rec == NULL) {
1249                 params->failed = true;
1250                 return -1;
1251         }
1252         if (params->len + rec->length >= params->allocated_len) {
1253                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1254                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1255         }
1256         if (params->recdata == NULL) {
1257                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1258                          rec->length + params->len, params->recdata->count));
1259                 params->failed = true;
1260                 return -1;
1261         }
1262         params->recdata->count++;
1263         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1264         params->len += rec->length;
1265         talloc_free(rec);
1266
1267         return 0;
1268 }
1269
1270 /*
1271   push the recdb database out to all nodes
1272  */
1273 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1274                                bool persistent,
1275                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1276 {
1277         struct recdb_data params;
1278         struct ctdb_marshall_buffer *recdata;
1279         TDB_DATA outdata;
1280         TALLOC_CTX *tmp_ctx;
1281         uint32_t *nodes;
1282
1283         tmp_ctx = talloc_new(ctdb);
1284         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1285
1286         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1287         CTDB_NO_MEMORY(ctdb, recdata);
1288
1289         recdata->db_id = dbid;
1290
1291         params.ctdb = ctdb;
1292         params.recdata = recdata;
1293         params.len = offsetof(struct ctdb_marshall_buffer, data);
1294         params.allocated_len = params.len;
1295         params.failed = false;
1296         params.persistent = persistent;
1297
1298         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1299                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1300                 talloc_free(params.recdata);
1301                 talloc_free(tmp_ctx);
1302                 return -1;
1303         }
1304
1305         if (params.failed) {
1306                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1307                 talloc_free(params.recdata);
1308                 talloc_free(tmp_ctx);
1309                 return -1;              
1310         }
1311
1312         recdata = params.recdata;
1313
1314         outdata.dptr = (void *)recdata;
1315         outdata.dsize = params.len;
1316
1317         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1318         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1319                                         nodes, 0,
1320                                         CONTROL_TIMEOUT(), false, outdata,
1321                                         NULL, NULL,
1322                                         NULL) != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1324                 talloc_free(recdata);
1325                 talloc_free(tmp_ctx);
1326                 return -1;
1327         }
1328
1329         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1330                   dbid, recdata->count));
1331
1332         talloc_free(recdata);
1333         talloc_free(tmp_ctx);
1334
1335         return 0;
1336 }
1337
1338
1339 /*
1340   go through a full recovery on one database 
1341  */
1342 static int recover_database(struct ctdb_recoverd *rec, 
1343                             TALLOC_CTX *mem_ctx,
1344                             uint32_t dbid,
1345                             bool persistent,
1346                             uint32_t pnn, 
1347                             struct ctdb_node_map *nodemap,
1348                             uint32_t transaction_id)
1349 {
1350         struct tdb_wrap *recdb;
1351         int ret;
1352         struct ctdb_context *ctdb = rec->ctdb;
1353         TDB_DATA data;
1354         struct ctdb_control_wipe_database w;
1355         uint32_t *nodes;
1356
1357         recdb = create_recdb(ctdb, mem_ctx);
1358         if (recdb == NULL) {
1359                 return -1;
1360         }
1361
1362         /* pull all remote databases onto the recdb */
1363         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1364         if (ret != 0) {
1365                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1366                 return -1;
1367         }
1368
1369         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1370
1371         /* wipe all the remote databases. This is safe as we are in a transaction */
1372         w.db_id = dbid;
1373         w.transaction_id = transaction_id;
1374
1375         data.dptr = (void *)&w;
1376         data.dsize = sizeof(w);
1377
1378         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1379         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1380                                         nodes, 0,
1381                                         CONTROL_TIMEOUT(), false, data,
1382                                         NULL, NULL,
1383                                         NULL) != 0) {
1384                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1385                 talloc_free(recdb);
1386                 return -1;
1387         }
1388         
1389         /* push out the correct database. This sets the dmaster and skips 
1390            the empty records */
1391         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1392         if (ret != 0) {
1393                 talloc_free(recdb);
1394                 return -1;
1395         }
1396
1397         /* all done with this database */
1398         talloc_free(recdb);
1399
1400         return 0;
1401 }
1402
1403 /*
1404   reload the nodes file 
1405 */
1406 static void reload_nodes_file(struct ctdb_context *ctdb)
1407 {
1408         ctdb->nodes = NULL;
1409         ctdb_load_nodes_file(ctdb);
1410 }
1411
1412 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1413                                          struct ctdb_recoverd *rec,
1414                                          struct ctdb_node_map *nodemap,
1415                                          uint32_t *culprit)
1416 {
1417         int j;
1418         int ret;
1419
1420         if (ctdb->num_nodes != nodemap->num) {
1421                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1422                                   ctdb->num_nodes, nodemap->num));
1423                 if (culprit) {
1424                         *culprit = ctdb->pnn;
1425                 }
1426                 return -1;
1427         }
1428
1429         for (j=0; j<nodemap->num; j++) {
1430                 /* release any existing data */
1431                 if (ctdb->nodes[j]->known_public_ips) {
1432                         talloc_free(ctdb->nodes[j]->known_public_ips);
1433                         ctdb->nodes[j]->known_public_ips = NULL;
1434                 }
1435                 if (ctdb->nodes[j]->available_public_ips) {
1436                         talloc_free(ctdb->nodes[j]->available_public_ips);
1437                         ctdb->nodes[j]->available_public_ips = NULL;
1438                 }
1439
1440                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1441                         continue;
1442                 }
1443
1444                 /* grab a new shiny list of public ips from the node */
1445                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1446                                         CONTROL_TIMEOUT(),
1447                                         ctdb->nodes[j]->pnn,
1448                                         ctdb->nodes,
1449                                         0,
1450                                         &ctdb->nodes[j]->known_public_ips);
1451                 if (ret != 0) {
1452                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1453                                 ctdb->nodes[j]->pnn));
1454                         if (culprit) {
1455                                 *culprit = ctdb->nodes[j]->pnn;
1456                         }
1457                         return -1;
1458                 }
1459
1460                 if (ctdb->do_checkpublicip) {
1461                         if (rec->ip_check_disable_ctx == NULL) {
1462                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1463                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1464                                         rec->need_takeover_run = true;
1465                                 }
1466                         }
1467                 }
1468
1469                 /* grab a new shiny list of public ips from the node */
1470                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1471                                         CONTROL_TIMEOUT(),
1472                                         ctdb->nodes[j]->pnn,
1473                                         ctdb->nodes,
1474                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1475                                         &ctdb->nodes[j]->available_public_ips);
1476                 if (ret != 0) {
1477                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1478                                 ctdb->nodes[j]->pnn));
1479                         if (culprit) {
1480                                 *culprit = ctdb->nodes[j]->pnn;
1481                         }
1482                         return -1;
1483                 }
1484         }
1485
1486         return 0;
1487 }
1488
1489 /* when we start a recovery, make sure all nodes use the same reclock file
1490    setting
1491 */
1492 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1493 {
1494         struct ctdb_context *ctdb = rec->ctdb;
1495         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1496         TDB_DATA data;
1497         uint32_t *nodes;
1498
1499         if (ctdb->recovery_lock_file == NULL) {
1500                 data.dptr  = NULL;
1501                 data.dsize = 0;
1502         } else {
1503                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1504                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1505         }
1506
1507         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1508         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1509                                         nodes, 0,
1510                                         CONTROL_TIMEOUT(),
1511                                         false, data,
1512                                         NULL, NULL,
1513                                         rec) != 0) {
1514                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1515                 talloc_free(tmp_ctx);
1516                 return -1;
1517         }
1518
1519         talloc_free(tmp_ctx);
1520         return 0;
1521 }
1522
1523
1524 /*
1525  * this callback is called for every node that failed to execute ctdb_takeover_run()
1526  * and set flag to re-run takeover run.
1527  */
1528 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1529 {
1530         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1531
1532         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the takeover run. Setting it as recovery fail culprit\n", node_pnn));
1533
1534         ctdb_set_culprit(rec, node_pnn);
1535         rec->need_takeover_run = true;
1536 }
1537
1538
1539 /*
1540   we are the recmaster, and recovery is needed - start a recovery run
1541  */
1542 static int do_recovery(struct ctdb_recoverd *rec, 
1543                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1544                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1545 {
1546         struct ctdb_context *ctdb = rec->ctdb;
1547         int i, j, ret;
1548         uint32_t generation;
1549         struct ctdb_dbid_map *dbmap;
1550         TDB_DATA data;
1551         uint32_t *nodes;
1552         struct timeval start_time;
1553         uint32_t culprit = (uint32_t)-1;
1554
1555         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1556
1557         /* if recovery fails, force it again */
1558         rec->need_recovery = true;
1559
1560         for (i=0; i<ctdb->num_nodes; i++) {
1561                 struct ctdb_banning_state *ban_state;
1562
1563                 if (ctdb->nodes[i]->ban_state == NULL) {
1564                         continue;
1565                 }
1566                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1567                 if (ban_state->count < 2*ctdb->num_nodes) {
1568                         continue;
1569                 }
1570                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1571                         ctdb->nodes[i]->pnn, ban_state->count,
1572                         ctdb->tunable.recovery_ban_period));
1573                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1574                 ban_state->count = 0;
1575         }
1576
1577
1578         if (ctdb->tunable.verify_recovery_lock != 0) {
1579                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1580                 start_time = timeval_current();
1581                 if (!ctdb_recovery_lock(ctdb, true)) {
1582                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1583                                          "and ban ourself for %u seconds\n",
1584                                          ctdb->tunable.recovery_ban_period));
1585                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1586                         return -1;
1587                 }
1588                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1589                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1590         }
1591
1592         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1593
1594         /* get a list of all databases */
1595         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1596         if (ret != 0) {
1597                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1598                 return -1;
1599         }
1600
1601         /* we do the db creation before we set the recovery mode, so the freeze happens
1602            on all databases we will be dealing with. */
1603
1604         /* verify that we have all the databases any other node has */
1605         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1606         if (ret != 0) {
1607                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1608                 return -1;
1609         }
1610
1611         /* verify that all other nodes have all our databases */
1612         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1615                 return -1;
1616         }
1617         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1618
1619         /* update the database priority for all remote databases */
1620         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1621         if (ret != 0) {
1622                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1623         }
1624         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1625
1626
1627         /* update all other nodes to use the same setting for reclock files
1628            as the local recovery master.
1629         */
1630         sync_recovery_lock_file_across_cluster(rec);
1631
1632         /* set recovery mode to active on all nodes */
1633         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1634         if (ret != 0) {
1635                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1636                 return -1;
1637         }
1638
1639         /* execute the "startrecovery" event script on all nodes */
1640         ret = run_startrecovery_eventscript(rec, nodemap);
1641         if (ret!=0) {
1642                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1643                 return -1;
1644         }
1645
1646         /*
1647           update all nodes to have the same flags that we have
1648          */
1649         for (i=0;i<nodemap->num;i++) {
1650                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1651                         continue;
1652                 }
1653
1654                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1655                 if (ret != 0) {
1656                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1657                         return -1;
1658                 }
1659         }
1660
1661         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1662
1663         /* pick a new generation number */
1664         generation = new_generation();
1665
1666         /* change the vnnmap on this node to use the new generation 
1667            number but not on any other nodes.
1668            this guarantees that if we abort the recovery prematurely
1669            for some reason (a node stops responding?)
1670            that we can just return immediately and we will reenter
1671            recovery shortly again.
1672            I.e. we deliberately leave the cluster with an inconsistent
1673            generation id to allow us to abort recovery at any stage and
1674            just restart it from scratch.
1675          */
1676         vnnmap->generation = generation;
1677         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1678         if (ret != 0) {
1679                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1680                 return -1;
1681         }
1682
1683         data.dptr = (void *)&generation;
1684         data.dsize = sizeof(uint32_t);
1685
1686         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1687         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1688                                         nodes, 0,
1689                                         CONTROL_TIMEOUT(), false, data,
1690                                         NULL,
1691                                         transaction_start_fail_callback,
1692                                         rec) != 0) {
1693                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1694                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1695                                         nodes, 0,
1696                                         CONTROL_TIMEOUT(), false, tdb_null,
1697                                         NULL,
1698                                         NULL,
1699                                         NULL) != 0) {
1700                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1701                 }
1702                 return -1;
1703         }
1704
1705         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1706
1707         for (i=0;i<dbmap->num;i++) {
1708                 ret = recover_database(rec, mem_ctx,
1709                                        dbmap->dbs[i].dbid,
1710                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1711                                        pnn, nodemap, generation);
1712                 if (ret != 0) {
1713                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1714                         return -1;
1715                 }
1716         }
1717
1718         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1719
1720         /* commit all the changes */
1721         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1722                                         nodes, 0,
1723                                         CONTROL_TIMEOUT(), false, data,
1724                                         NULL, NULL,
1725                                         NULL) != 0) {
1726                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1727                 return -1;
1728         }
1729
1730         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1731         
1732
1733         /* update the capabilities for all nodes */
1734         ret = update_capabilities(ctdb, nodemap);
1735         if (ret!=0) {
1736                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1737                 return -1;
1738         }
1739
1740         /* build a new vnn map with all the currently active and
1741            unbanned nodes */
1742         generation = new_generation();
1743         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1744         CTDB_NO_MEMORY(ctdb, vnnmap);
1745         vnnmap->generation = generation;
1746         vnnmap->size = 0;
1747         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1748         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1749         for (i=j=0;i<nodemap->num;i++) {
1750                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1751                         continue;
1752                 }
1753                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1754                         /* this node can not be an lmaster */
1755                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1756                         continue;
1757                 }
1758
1759                 vnnmap->size++;
1760                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1761                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1762                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1763
1764         }
1765         if (vnnmap->size == 0) {
1766                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1767                 vnnmap->size++;
1768                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1769                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1770                 vnnmap->map[0] = pnn;
1771         }       
1772
1773         /* update to the new vnnmap on all nodes */
1774         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1775         if (ret != 0) {
1776                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1777                 return -1;
1778         }
1779
1780         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1781
1782         /* update recmaster to point to us for all nodes */
1783         ret = set_recovery_master(ctdb, nodemap, pnn);
1784         if (ret!=0) {
1785                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1786                 return -1;
1787         }
1788
1789         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1790
1791         /*
1792           update all nodes to have the same flags that we have
1793          */
1794         for (i=0;i<nodemap->num;i++) {
1795                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1796                         continue;
1797                 }
1798
1799                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1800                 if (ret != 0) {
1801                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1802                         return -1;
1803                 }
1804         }
1805
1806         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1807
1808         /* disable recovery mode */
1809         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1810         if (ret != 0) {
1811                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1812                 return -1;
1813         }
1814
1815         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1816
1817         /*
1818           tell nodes to takeover their public IPs
1819          */
1820         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1821         if (ret != 0) {
1822                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1823                                  culprit));
1824                 rec->need_takeover_run = true;
1825                 return -1;
1826         }
1827         rec->need_takeover_run = false;
1828         ret = ctdb_takeover_run(ctdb, nodemap, NULL, NULL);
1829         if (ret != 0) {
1830                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1831                 rec->need_takeover_run = true;
1832         }
1833
1834         /* execute the "recovered" event script on all nodes */
1835         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1836         if (ret!=0) {
1837                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1838                 return -1;
1839         }
1840
1841         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1842
1843         /* send a message to all clients telling them that the cluster 
1844            has been reconfigured */
1845         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1846
1847         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1848
1849         rec->need_recovery = false;
1850
1851         /* we managed to complete a full recovery, make sure to forgive
1852            any past sins by the nodes that could now participate in the
1853            recovery.
1854         */
1855         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1856         for (i=0;i<nodemap->num;i++) {
1857                 struct ctdb_banning_state *ban_state;
1858
1859                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1860                         continue;
1861                 }
1862
1863                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1864                 if (ban_state == NULL) {
1865                         continue;
1866                 }
1867
1868                 ban_state->count = 0;
1869         }
1870
1871
1872         /* We just finished a recovery successfully. 
1873            We now wait for rerecovery_timeout before we allow 
1874            another recovery to take place.
1875         */
1876         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1877         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1878         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1879
1880         return 0;
1881 }
1882
1883
1884 /*
1885   elections are won by first checking the number of connected nodes, then
1886   the priority time, then the pnn
1887  */
1888 struct election_message {
1889         uint32_t num_connected;
1890         struct timeval priority_time;
1891         uint32_t pnn;
1892         uint32_t node_flags;
1893 };
1894
1895 /*
1896   form this nodes election data
1897  */
1898 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1899 {
1900         int ret, i;
1901         struct ctdb_node_map *nodemap;
1902         struct ctdb_context *ctdb = rec->ctdb;
1903
1904         ZERO_STRUCTP(em);
1905
1906         em->pnn = rec->ctdb->pnn;
1907         em->priority_time = rec->priority_time;
1908
1909         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1910         if (ret != 0) {
1911                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1912                 return;
1913         }
1914
1915         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1916         em->node_flags = rec->node_flags;
1917
1918         for (i=0;i<nodemap->num;i++) {
1919                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1920                         em->num_connected++;
1921                 }
1922         }
1923
1924         /* we shouldnt try to win this election if we cant be a recmaster */
1925         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1926                 em->num_connected = 0;
1927                 em->priority_time = timeval_current();
1928         }
1929
1930         talloc_free(nodemap);
1931 }
1932
1933 /*
1934   see if the given election data wins
1935  */
1936 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1937 {
1938         struct election_message myem;
1939         int cmp = 0;
1940
1941         ctdb_election_data(rec, &myem);
1942
1943         /* we cant win if we dont have the recmaster capability */
1944         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1945                 return false;
1946         }
1947
1948         /* we cant win if we are banned */
1949         if (rec->node_flags & NODE_FLAGS_BANNED) {
1950                 return false;
1951         }       
1952
1953         /* we cant win if we are stopped */
1954         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1955                 return false;
1956         }       
1957
1958         /* we will automatically win if the other node is banned */
1959         if (em->node_flags & NODE_FLAGS_BANNED) {
1960                 return true;
1961         }
1962
1963         /* we will automatically win if the other node is banned */
1964         if (em->node_flags & NODE_FLAGS_STOPPED) {
1965                 return true;
1966         }
1967
1968         /* try to use the most connected node */
1969         if (cmp == 0) {
1970                 cmp = (int)myem.num_connected - (int)em->num_connected;
1971         }
1972
1973         /* then the longest running node */
1974         if (cmp == 0) {
1975                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1976         }
1977
1978         if (cmp == 0) {
1979                 cmp = (int)myem.pnn - (int)em->pnn;
1980         }
1981
1982         return cmp > 0;
1983 }
1984
1985 /*
1986   send out an election request
1987  */
1988 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1989 {
1990         int ret;
1991         TDB_DATA election_data;
1992         struct election_message emsg;
1993         uint64_t srvid;
1994         struct ctdb_context *ctdb = rec->ctdb;
1995
1996         srvid = CTDB_SRVID_RECOVERY;
1997
1998         ctdb_election_data(rec, &emsg);
1999
2000         election_data.dsize = sizeof(struct election_message);
2001         election_data.dptr  = (unsigned char *)&emsg;
2002
2003
2004         /* send an election message to all active nodes */
2005         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2006         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2007
2008
2009         /* A new node that is already frozen has entered the cluster.
2010            The existing nodes are not frozen and dont need to be frozen
2011            until the election has ended and we start the actual recovery
2012         */
2013         if (update_recmaster == true) {
2014                 /* first we assume we will win the election and set 
2015                    recoverymaster to be ourself on the current node
2016                  */
2017                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2018                 if (ret != 0) {
2019                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2020                         return -1;
2021                 }
2022         }
2023
2024
2025         return 0;
2026 }
2027
2028 /*
2029   this function will unban all nodes in the cluster
2030 */
2031 static void unban_all_nodes(struct ctdb_context *ctdb)
2032 {
2033         int ret, i;
2034         struct ctdb_node_map *nodemap;
2035         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2036         
2037         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2038         if (ret != 0) {
2039                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2040                 return;
2041         }
2042
2043         for (i=0;i<nodemap->num;i++) {
2044                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2045                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2046                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2047                 }
2048         }
2049
2050         talloc_free(tmp_ctx);
2051 }
2052
2053
2054 /*
2055   we think we are winning the election - send a broadcast election request
2056  */
2057 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2058 {
2059         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2060         int ret;
2061
2062         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2063         if (ret != 0) {
2064                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2065         }
2066
2067         talloc_free(rec->send_election_te);
2068         rec->send_election_te = NULL;
2069 }
2070
2071 /*
2072   handler for memory dumps
2073 */
2074 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2075                              TDB_DATA data, void *private_data)
2076 {
2077         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2078         TDB_DATA *dump;
2079         int ret;
2080         struct rd_memdump_reply *rd;
2081
2082         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2083                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2084                 talloc_free(tmp_ctx);
2085                 return;
2086         }
2087         rd = (struct rd_memdump_reply *)data.dptr;
2088
2089         dump = talloc_zero(tmp_ctx, TDB_DATA);
2090         if (dump == NULL) {
2091                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2092                 talloc_free(tmp_ctx);
2093                 return;
2094         }
2095         ret = ctdb_dump_memory(ctdb, dump);
2096         if (ret != 0) {
2097                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2098                 talloc_free(tmp_ctx);
2099                 return;
2100         }
2101
2102 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2103
2104         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2105         if (ret != 0) {
2106                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2107                 talloc_free(tmp_ctx);
2108                 return;
2109         }
2110
2111         talloc_free(tmp_ctx);
2112 }
2113
2114 /*
2115   handler for getlog
2116 */
2117 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2118                            TDB_DATA data, void *private_data)
2119 {
2120         struct ctdb_get_log_addr *log_addr;
2121         pid_t child;
2122
2123         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2124                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2125                 return;
2126         }
2127         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2128
2129         child = ctdb_fork_no_free_ringbuffer(ctdb);
2130         if (child == (pid_t)-1) {
2131                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2132                 return;
2133         }
2134
2135         if (child == 0) {
2136                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2137                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2138                         _exit(1);
2139                 }
2140                 ctdb_collect_log(ctdb, log_addr);
2141                 _exit(0);
2142         }
2143 }
2144
2145 /*
2146   handler for clearlog
2147 */
2148 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2149                              TDB_DATA data, void *private_data)
2150 {
2151         ctdb_clear_log(ctdb);
2152 }
2153
2154 /*
2155   handler for reload_nodes
2156 */
2157 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2158                              TDB_DATA data, void *private_data)
2159 {
2160         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2161
2162         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2163
2164         reload_nodes_file(rec->ctdb);
2165 }
2166
2167
2168 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2169                               struct timeval yt, void *p)
2170 {
2171         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2172
2173         talloc_free(rec->ip_check_disable_ctx);
2174         rec->ip_check_disable_ctx = NULL;
2175 }
2176
2177
2178 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2179                                   struct timeval t, void *p)
2180 {
2181         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2182         struct ctdb_context *ctdb = rec->ctdb;
2183         int ret;
2184
2185         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2186
2187         ret = ctdb_takeover_run(ctdb, rec->nodemap, NULL, NULL);
2188         if (ret != 0) {
2189                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2190                 rec->need_takeover_run = true;
2191         }
2192
2193         talloc_free(rec->deferred_rebalance_ctx);
2194         rec->deferred_rebalance_ctx = NULL;
2195 }
2196
2197         
2198 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2199                              TDB_DATA data, void *private_data)
2200 {
2201         uint32_t pnn;
2202         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2203
2204         if (data.dsize != sizeof(uint32_t)) {
2205                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2206                 return;
2207         }
2208
2209         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2210                 return;
2211         }
2212
2213         pnn = *(uint32_t *)&data.dptr[0];
2214
2215         lcp2_forcerebalance(ctdb, pnn);
2216         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2217
2218         if (rec->deferred_rebalance_ctx != NULL) {
2219                 talloc_free(rec->deferred_rebalance_ctx);
2220         }
2221         rec->deferred_rebalance_ctx = talloc_new(rec);
2222         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2223                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2224                         ctdb_rebalance_timeout, rec);
2225 }
2226
2227
2228
2229 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2230                              TDB_DATA data, void *private_data)
2231 {
2232         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2233         struct ctdb_public_ip *ip;
2234
2235         if (rec->recmaster != rec->ctdb->pnn) {
2236                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2237                 return;
2238         }
2239
2240         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2241                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2242                 return;
2243         }
2244
2245         ip = (struct ctdb_public_ip *)data.dptr;
2246
2247         update_ip_assignment_tree(rec->ctdb, ip);
2248 }
2249
2250
2251 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2252                              TDB_DATA data, void *private_data)
2253 {
2254         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2255         uint32_t timeout;
2256
2257         if (rec->ip_check_disable_ctx != NULL) {
2258                 talloc_free(rec->ip_check_disable_ctx);
2259                 rec->ip_check_disable_ctx = NULL;
2260         }
2261
2262         if (data.dsize != sizeof(uint32_t)) {
2263                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2264                                  "expexting %lu\n", (long unsigned)data.dsize,
2265                                  (long unsigned)sizeof(uint32_t)));
2266                 return;
2267         }
2268         if (data.dptr == NULL) {
2269                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2270                 return;
2271         }
2272
2273         timeout = *((uint32_t *)data.dptr);
2274
2275         if (timeout == 0) {
2276                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2277                 return;
2278         }
2279                 
2280         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2281
2282         rec->ip_check_disable_ctx = talloc_new(rec);
2283         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2284
2285         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2286 }
2287
2288
2289 /*
2290   handler for reload all ips.
2291 */
2292 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2293                              TDB_DATA data, void *private_data)
2294 {
2295         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2296
2297         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2298                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2299                 return;
2300         }
2301
2302         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2303
2304         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2305         return;
2306 }
2307
2308 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2309 {
2310         uint32_t *status = callback_data;
2311
2312         if (res != 0) {
2313                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2314                 *status = 1;
2315         }
2316 }
2317
2318 static int
2319 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2320 {
2321         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2322         uint32_t *nodes;
2323         uint32_t status;
2324         int i;
2325
2326         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2327         for (i = 0; i< nodemap->num; i++) {
2328                 if (nodemap->nodes[i].flags != 0) {
2329                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2330                         talloc_free(tmp_ctx);
2331                         return -1;
2332                 }
2333         }
2334
2335         /* send the flags update to all connected nodes */
2336         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2337         status = 0;
2338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2339                                         nodes, 0,
2340                                         CONTROL_TIMEOUT(),
2341                                         false, tdb_null,
2342                                         async_reloadips_callback, NULL,
2343                                         &status) != 0) {
2344                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2345                 talloc_free(tmp_ctx);
2346                 return -1;
2347         }
2348
2349         if (status != 0) {
2350                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2351                 talloc_free(tmp_ctx);
2352                 return -1;
2353         }
2354
2355         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2356
2357         talloc_free(tmp_ctx);
2358         return 0;
2359 }
2360
2361
2362 /*
2363   handler for ip reallocate, just add it to the list of callers and 
2364   handle this later in the monitor_cluster loop so we do not recurse
2365   with other callers to takeover_run()
2366 */
2367 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2368                              TDB_DATA data, void *private_data)
2369 {
2370         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2371         struct ip_reallocate_list *caller;
2372
2373         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2374                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2375                 return;
2376         }
2377
2378         if (rec->ip_reallocate_ctx == NULL) {
2379                 rec->ip_reallocate_ctx = talloc_new(rec);
2380                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2381         }
2382
2383         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2384         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2385
2386         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2387         caller->next = rec->reallocate_callers;
2388         rec->reallocate_callers = caller;
2389
2390         return;
2391 }
2392
2393 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2394 {
2395         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2396         TDB_DATA result;
2397         int32_t ret;
2398         struct ip_reallocate_list *callers;
2399         uint32_t culprit;
2400
2401         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2402
2403         /* update the list of public ips that a node can handle for
2404            all connected nodes
2405         */
2406         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2407         if (ret != 0) {
2408                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2409                                  culprit));
2410                 rec->need_takeover_run = true;
2411         }
2412         if (ret == 0) {
2413                 ret = ctdb_takeover_run(ctdb, rec->nodemap, NULL, NULL);
2414                 if (ret != 0) {
2415                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2416                         rec->need_takeover_run = true;
2417                 }
2418         }
2419
2420         result.dsize = sizeof(int32_t);
2421         result.dptr  = (uint8_t *)&ret;
2422
2423         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2424
2425                 /* Someone that sent srvid==0 does not want a reply */
2426                 if (callers->rd->srvid == 0) {
2427                         continue;
2428                 }
2429                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2430                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2431                                   (unsigned long long)callers->rd->srvid));
2432                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2433                 if (ret != 0) {
2434                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2435                                          "message to %u:%llu\n",
2436                                          (unsigned)callers->rd->pnn,
2437                                          (unsigned long long)callers->rd->srvid));
2438                 }
2439         }
2440
2441         talloc_free(tmp_ctx);
2442         talloc_free(rec->ip_reallocate_ctx);
2443         rec->ip_reallocate_ctx = NULL;
2444         rec->reallocate_callers = NULL;
2445         
2446 }
2447
2448
2449 /*
2450   handler for recovery master elections
2451 */
2452 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2453                              TDB_DATA data, void *private_data)
2454 {
2455         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2456         int ret;
2457         struct election_message *em = (struct election_message *)data.dptr;
2458         TALLOC_CTX *mem_ctx;
2459
2460         /* we got an election packet - update the timeout for the election */
2461         talloc_free(rec->election_timeout);
2462         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2463                                                 fast_start ?
2464                                                 timeval_current_ofs(0, 500000) :
2465                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2466                                                 ctdb_election_timeout, rec);
2467
2468         mem_ctx = talloc_new(ctdb);
2469
2470         /* someone called an election. check their election data
2471            and if we disagree and we would rather be the elected node, 
2472            send a new election message to all other nodes
2473          */
2474         if (ctdb_election_win(rec, em)) {
2475                 if (!rec->send_election_te) {
2476                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2477                                                                 timeval_current_ofs(0, 500000),
2478                                                                 election_send_request, rec);
2479                 }
2480                 talloc_free(mem_ctx);
2481                 /*unban_all_nodes(ctdb);*/
2482                 return;
2483         }
2484         
2485         /* we didn't win */
2486         talloc_free(rec->send_election_te);
2487         rec->send_election_te = NULL;
2488
2489         if (ctdb->tunable.verify_recovery_lock != 0) {
2490                 /* release the recmaster lock */
2491                 if (em->pnn != ctdb->pnn &&
2492                     ctdb->recovery_lock_fd != -1) {
2493                         close(ctdb->recovery_lock_fd);
2494                         ctdb->recovery_lock_fd = -1;
2495                         unban_all_nodes(ctdb);
2496                 }
2497         }
2498
2499         /* ok, let that guy become recmaster then */
2500         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2501         if (ret != 0) {
2502                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2503                 talloc_free(mem_ctx);
2504                 return;
2505         }
2506
2507         talloc_free(mem_ctx);
2508         return;
2509 }
2510
2511
2512 /*
2513   force the start of the election process
2514  */
2515 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2516                            struct ctdb_node_map *nodemap)
2517 {
2518         int ret;
2519         struct ctdb_context *ctdb = rec->ctdb;
2520
2521         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2522
2523         /* set all nodes to recovery mode to stop all internode traffic */
2524         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2525         if (ret != 0) {
2526                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2527                 return;
2528         }
2529
2530         talloc_free(rec->election_timeout);
2531         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2532                                                 fast_start ?
2533                                                 timeval_current_ofs(0, 500000) :
2534                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2535                                                 ctdb_election_timeout, rec);
2536
2537         ret = send_election_request(rec, pnn, true);
2538         if (ret!=0) {
2539                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2540                 return;
2541         }
2542
2543         /* wait for a few seconds to collect all responses */
2544         ctdb_wait_election(rec);
2545 }
2546
2547
2548
2549 /*
2550   handler for when a node changes its flags
2551 */
2552 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2553                             TDB_DATA data, void *private_data)
2554 {
2555         int ret;
2556         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2557         struct ctdb_node_map *nodemap=NULL;
2558         TALLOC_CTX *tmp_ctx;
2559         int i;
2560         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2561         int disabled_flag_changed;
2562
2563         if (data.dsize != sizeof(*c)) {
2564                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2565                 return;
2566         }
2567
2568         tmp_ctx = talloc_new(ctdb);
2569         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2570
2571         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2572         if (ret != 0) {
2573                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2574                 talloc_free(tmp_ctx);
2575                 return;         
2576         }
2577
2578
2579         for (i=0;i<nodemap->num;i++) {
2580                 if (nodemap->nodes[i].pnn == c->pnn) break;
2581         }
2582
2583         if (i == nodemap->num) {
2584                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2585                 talloc_free(tmp_ctx);
2586                 return;
2587         }
2588
2589         if (nodemap->nodes[i].flags != c->new_flags) {
2590                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2591         }
2592
2593         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2594
2595         nodemap->nodes[i].flags = c->new_flags;
2596
2597         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2598                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2599
2600         if (ret == 0) {
2601                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2602                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2603         }
2604         
2605         if (ret == 0 &&
2606             ctdb->recovery_master == ctdb->pnn &&
2607             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2608                 /* Only do the takeover run if the perm disabled or unhealthy
2609                    flags changed since these will cause an ip failover but not
2610                    a recovery.
2611                    If the node became disconnected or banned this will also
2612                    lead to an ip address failover but that is handled 
2613                    during recovery
2614                 */
2615                 if (disabled_flag_changed) {
2616                         rec->need_takeover_run = true;
2617                 }
2618         }
2619
2620         talloc_free(tmp_ctx);
2621 }
2622
2623 /*
2624   handler for when we need to push out flag changes ot all other nodes
2625 */
2626 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2627                             TDB_DATA data, void *private_data)
2628 {
2629         int ret;
2630         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2631         struct ctdb_node_map *nodemap=NULL;
2632         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2633         uint32_t recmaster;
2634         uint32_t *nodes;
2635
2636         /* find the recovery master */
2637         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2638         if (ret != 0) {
2639                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2640                 talloc_free(tmp_ctx);
2641                 return;
2642         }
2643
2644         /* read the node flags from the recmaster */
2645         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2646         if (ret != 0) {
2647                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2648                 talloc_free(tmp_ctx);
2649                 return;
2650         }
2651         if (c->pnn >= nodemap->num) {
2652                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2653                 talloc_free(tmp_ctx);
2654                 return;
2655         }
2656
2657         /* send the flags update to all connected nodes */
2658         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2659
2660         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2661                                       nodes, 0, CONTROL_TIMEOUT(),
2662                                       false, data,
2663                                       NULL, NULL,
2664                                       NULL) != 0) {
2665                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2666
2667                 talloc_free(tmp_ctx);
2668                 return;
2669         }
2670
2671         talloc_free(tmp_ctx);
2672 }
2673
2674
2675 struct verify_recmode_normal_data {
2676         uint32_t count;
2677         enum monitor_result status;
2678 };
2679
2680 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2681 {
2682         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2683
2684
2685         /* one more node has responded with recmode data*/
2686         rmdata->count--;
2687
2688         /* if we failed to get the recmode, then return an error and let
2689            the main loop try again.
2690         */
2691         if (state->state != CTDB_CONTROL_DONE) {
2692                 if (rmdata->status == MONITOR_OK) {
2693                         rmdata->status = MONITOR_FAILED;
2694                 }
2695                 return;
2696         }
2697
2698         /* if we got a response, then the recmode will be stored in the
2699            status field
2700         */
2701         if (state->status != CTDB_RECOVERY_NORMAL) {
2702                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2703                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2704         }
2705
2706         return;
2707 }
2708
2709
2710 /* verify that all nodes are in normal recovery mode */
2711 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2712 {
2713         struct verify_recmode_normal_data *rmdata;
2714         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2715         struct ctdb_client_control_state *state;
2716         enum monitor_result status;
2717         int j;
2718         
2719         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2720         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2721         rmdata->count  = 0;
2722         rmdata->status = MONITOR_OK;
2723
2724         /* loop over all active nodes and send an async getrecmode call to 
2725            them*/
2726         for (j=0; j<nodemap->num; j++) {
2727                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2728                         continue;
2729                 }
2730                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2731                                         CONTROL_TIMEOUT(), 
2732                                         nodemap->nodes[j].pnn);
2733                 if (state == NULL) {
2734                         /* we failed to send the control, treat this as 
2735                            an error and try again next iteration
2736                         */                      
2737                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2738                         talloc_free(mem_ctx);
2739                         return MONITOR_FAILED;
2740                 }
2741
2742                 /* set up the callback functions */
2743                 state->async.fn = verify_recmode_normal_callback;
2744                 state->async.private_data = rmdata;
2745
2746                 /* one more control to wait for to complete */
2747                 rmdata->count++;
2748         }
2749
2750
2751         /* now wait for up to the maximum number of seconds allowed
2752            or until all nodes we expect a response from has replied
2753         */
2754         while (rmdata->count > 0) {
2755                 event_loop_once(ctdb->ev);
2756         }
2757
2758         status = rmdata->status;
2759         talloc_free(mem_ctx);
2760         return status;
2761 }
2762
2763
2764 struct verify_recmaster_data {
2765         struct ctdb_recoverd *rec;
2766         uint32_t count;
2767         uint32_t pnn;
2768         enum monitor_result status;
2769 };
2770
2771 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2772 {
2773         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2774
2775
2776         /* one more node has responded with recmaster data*/
2777         rmdata->count--;
2778
2779         /* if we failed to get the recmaster, then return an error and let
2780            the main loop try again.
2781         */
2782         if (state->state != CTDB_CONTROL_DONE) {
2783                 if (rmdata->status == MONITOR_OK) {
2784                         rmdata->status = MONITOR_FAILED;
2785                 }
2786                 return;
2787         }
2788
2789         /* if we got a response, then the recmaster will be stored in the
2790            status field
2791         */
2792         if (state->status != rmdata->pnn) {
2793                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2794                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2795                 rmdata->status = MONITOR_ELECTION_NEEDED;
2796         }
2797
2798         return;
2799 }
2800
2801
2802 /* verify that all nodes agree that we are the recmaster */
2803 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2804 {
2805         struct ctdb_context *ctdb = rec->ctdb;
2806         struct verify_recmaster_data *rmdata;
2807         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2808         struct ctdb_client_control_state *state;
2809         enum monitor_result status;
2810         int j;
2811         
2812         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2813         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2814         rmdata->rec    = rec;
2815         rmdata->count  = 0;
2816         rmdata->pnn    = pnn;
2817         rmdata->status = MONITOR_OK;
2818
2819         /* loop over all active nodes and send an async getrecmaster call to 
2820            them*/
2821         for (j=0; j<nodemap->num; j++) {
2822                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2823                         continue;
2824                 }
2825                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2826                                         CONTROL_TIMEOUT(),
2827                                         nodemap->nodes[j].pnn);
2828                 if (state == NULL) {
2829                         /* we failed to send the control, treat this as 
2830                            an error and try again next iteration
2831                         */                      
2832                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2833                         talloc_free(mem_ctx);
2834                         return MONITOR_FAILED;
2835                 }
2836
2837                 /* set up the callback functions */
2838                 state->async.fn = verify_recmaster_callback;
2839                 state->async.private_data = rmdata;
2840
2841                 /* one more control to wait for to complete */
2842                 rmdata->count++;
2843         }
2844
2845
2846         /* now wait for up to the maximum number of seconds allowed
2847            or until all nodes we expect a response from has replied
2848         */
2849         while (rmdata->count > 0) {
2850                 event_loop_once(ctdb->ev);
2851         }
2852
2853         status = rmdata->status;
2854         talloc_free(mem_ctx);
2855         return status;
2856 }
2857
2858 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2859                                     struct ctdb_recoverd *rec)
2860 {
2861         struct ctdb_control_get_ifaces *ifaces = NULL;
2862         TALLOC_CTX *mem_ctx;
2863         bool ret = false;
2864
2865         mem_ctx = talloc_new(NULL);
2866
2867         /* Read the interfaces from the local node */
2868         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2869                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2870                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2871                 /* We could return an error.  However, this will be
2872                  * rare so we'll decide that the interfaces have
2873                  * actually changed, just in case.
2874                  */
2875                 talloc_free(mem_ctx);
2876                 return true;
2877         }
2878
2879         if (!rec->ifaces) {
2880                 /* We haven't been here before so things have changed */
2881                 ret = true;
2882         } else if (rec->ifaces->num != ifaces->num) {
2883                 /* Number of interfaces has changed */
2884                 ret = true;
2885         } else {
2886                 /* See if interface names or link states have changed */
2887                 int i;
2888                 for (i = 0; i < rec->ifaces->num; i++) {
2889                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2890                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0 ||
2891                             iface->link_state != ifaces->ifaces[i].link_state) {
2892                                 ret = true;
2893                                 break;
2894                         }
2895                 }
2896         }
2897
2898         talloc_free(rec->ifaces);
2899         rec->ifaces = talloc_steal(rec, ifaces);
2900
2901         talloc_free(mem_ctx);
2902         return ret;
2903 }
2904
2905 /* called to check that the local allocation of public ip addresses is ok.
2906 */
2907 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2908 {
2909         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2910         struct ctdb_uptime *uptime1 = NULL;
2911         struct ctdb_uptime *uptime2 = NULL;
2912         int ret, j;
2913         bool need_takeover_run = false;
2914
2915         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2916                                 CTDB_CURRENT_NODE, &uptime1);
2917         if (ret != 0) {
2918                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2919                 talloc_free(mem_ctx);
2920                 return -1;
2921         }
2922
2923         if (interfaces_have_changed(ctdb, rec)) {
2924                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2925                                      "local node %u - force takeover run\n",
2926                                      pnn));
2927                 need_takeover_run = true;
2928         }
2929
2930         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2931                                 CTDB_CURRENT_NODE, &uptime2);
2932         if (ret != 0) {
2933                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2934                 talloc_free(mem_ctx);
2935                 return -1;
2936         }
2937
2938         /* skip the check if the startrecovery time has changed */
2939         if (timeval_compare(&uptime1->last_recovery_started,
2940                             &uptime2->last_recovery_started) != 0) {
2941                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2942                 talloc_free(mem_ctx);
2943                 return 0;
2944         }
2945
2946         /* skip the check if the endrecovery time has changed */
2947         if (timeval_compare(&uptime1->last_recovery_finished,
2948                             &uptime2->last_recovery_finished) != 0) {
2949                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2950                 talloc_free(mem_ctx);
2951                 return 0;
2952         }
2953
2954         /* skip the check if we have started but not finished recovery */
2955         if (timeval_compare(&uptime1->last_recovery_finished,
2956                             &uptime1->last_recovery_started) != 1) {
2957                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2958                 talloc_free(mem_ctx);
2959
2960                 return 0;
2961         }
2962
2963         /* verify that we have the ip addresses we should have
2964            and we dont have ones we shouldnt have.
2965            if we find an inconsistency we set recmode to
2966            active on the local node and wait for the recmaster
2967            to do a full blown recovery.
2968            also if the pnn is -1 and we are healthy and can host the ip
2969            we also request a ip reallocation.
2970         */
2971         if (ctdb->tunable.disable_ip_failover == 0) {
2972                 struct ctdb_all_public_ips *ips = NULL;
2973
2974                 /* read the *available* IPs from the local node */
2975                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2976                 if (ret != 0) {
2977                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
2978                         talloc_free(mem_ctx);
2979                         return -1;
2980                 }
2981
2982                 for (j=0; j<ips->num; j++) {
2983                         if (ips->ips[j].pnn == -1 &&
2984                             nodemap->nodes[pnn].flags == 0) {
2985                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
2986                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
2987                                 need_takeover_run = true;
2988                         }
2989                 }
2990
2991                 talloc_free(ips);
2992
2993                 /* read the *known* IPs from the local node */
2994                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2995                 if (ret != 0) {
2996                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
2997                         talloc_free(mem_ctx);
2998                         return -1;
2999                 }
3000
3001                 for (j=0; j<ips->num; j++) {
3002                         if (ips->ips[j].pnn == pnn) {
3003                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3004                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3005                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3006                                         need_takeover_run = true;
3007                                 }
3008                         } else {
3009                                 if (ctdb->do_checkpublicip &&
3010                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3011
3012                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3013                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3014
3015                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3016                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3017                                         }
3018                                 }
3019                         }
3020                 }
3021         }
3022
3023         if (need_takeover_run) {
3024                 struct takeover_run_reply rd;
3025                 TDB_DATA data;
3026
3027                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3028
3029                 rd.pnn = ctdb->pnn;
3030                 rd.srvid = 0;
3031                 data.dptr = (uint8_t *)&rd;
3032                 data.dsize = sizeof(rd);
3033
3034                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3035                 if (ret != 0) {
3036                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3037                 }
3038         }
3039         talloc_free(mem_ctx);
3040         return 0;
3041 }
3042
3043
3044 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3045 {
3046         struct ctdb_node_map **remote_nodemaps = callback_data;
3047
3048         if (node_pnn >= ctdb->num_nodes) {
3049                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3050                 return;
3051         }
3052
3053         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3054
3055 }
3056
3057 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3058         struct ctdb_node_map *nodemap,
3059         struct ctdb_node_map **remote_nodemaps)
3060 {
3061         uint32_t *nodes;
3062
3063         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3064         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3065                                         nodes, 0,
3066                                         CONTROL_TIMEOUT(), false, tdb_null,
3067                                         async_getnodemap_callback,
3068                                         NULL,
3069                                         remote_nodemaps) != 0) {
3070                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3071
3072                 return -1;
3073         }
3074
3075         return 0;
3076 }
3077
3078 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3079 struct ctdb_check_reclock_state {
3080         struct ctdb_context *ctdb;
3081         struct timeval start_time;
3082         int fd[2];
3083         pid_t child;
3084         struct timed_event *te;
3085         struct fd_event *fde;
3086         enum reclock_child_status status;
3087 };
3088
3089 /* when we free the reclock state we must kill any child process.
3090 */
3091 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3092 {
3093         struct ctdb_context *ctdb = state->ctdb;
3094
3095         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3096
3097         if (state->fd[0] != -1) {
3098                 close(state->fd[0]);
3099                 state->fd[0] = -1;
3100         }
3101         if (state->fd[1] != -1) {
3102                 close(state->fd[1]);
3103                 state->fd[1] = -1;
3104         }
3105         ctdb_kill(ctdb, state->child, SIGKILL);
3106         return 0;
3107 }
3108
3109 /*
3110   called if our check_reclock child times out. this would happen if
3111   i/o to the reclock file blocks.
3112  */
3113 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3114                                          struct timeval t, void *private_data)
3115 {
3116         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3117                                            struct ctdb_check_reclock_state);
3118
3119         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3120         state->status = RECLOCK_TIMEOUT;
3121 }
3122
3123 /* this is called when the child process has completed checking the reclock
3124    file and has written data back to us through the pipe.
3125 */
3126 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3127                              uint16_t flags, void *private_data)
3128 {
3129         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3130                                              struct ctdb_check_reclock_state);
3131         char c = 0;
3132         int ret;
3133
3134         /* we got a response from our child process so we can abort the
3135            timeout.
3136         */
3137         talloc_free(state->te);
3138         state->te = NULL;
3139
3140         ret = read(state->fd[0], &c, 1);
3141         if (ret != 1 || c != RECLOCK_OK) {
3142                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3143                 state->status = RECLOCK_FAILED;
3144
3145                 return;
3146         }
3147
3148         state->status = RECLOCK_OK;
3149         return;
3150 }
3151
3152 static int check_recovery_lock(struct ctdb_context *ctdb)
3153 {
3154         int ret;
3155         struct ctdb_check_reclock_state *state;
3156         pid_t parent = getpid();
3157
3158         if (ctdb->recovery_lock_fd == -1) {
3159                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3160                 return -1;
3161         }
3162
3163         state = talloc(ctdb, struct ctdb_check_reclock_state);
3164         CTDB_NO_MEMORY(ctdb, state);
3165
3166         state->ctdb = ctdb;
3167         state->start_time = timeval_current();
3168         state->status = RECLOCK_CHECKING;
3169         state->fd[0] = -1;
3170         state->fd[1] = -1;
3171
3172         ret = pipe(state->fd);
3173         if (ret != 0) {
3174                 talloc_free(state);
3175                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3176                 return -1;
3177         }
3178
3179         state->child = ctdb_fork(ctdb);
3180         if (state->child == (pid_t)-1) {
3181                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3182                 close(state->fd[0]);
3183                 state->fd[0] = -1;
3184                 close(state->fd[1]);
3185                 state->fd[1] = -1;
3186                 talloc_free(state);
3187                 return -1;
3188         }
3189
3190         if (state->child == 0) {
3191                 char cc = RECLOCK_OK;
3192                 close(state->fd[0]);
3193                 state->fd[0] = -1;
3194
3195                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3196                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3197                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3198                         cc = RECLOCK_FAILED;
3199                 }
3200
3201                 write(state->fd[1], &cc, 1);
3202                 /* make sure we die when our parent dies */
3203                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3204                         sleep(5);
3205                         write(state->fd[1], &cc, 1);
3206                 }
3207                 _exit(0);
3208         }
3209         close(state->fd[1]);
3210         state->fd[1] = -1;
3211         set_close_on_exec(state->fd[0]);
3212
3213         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3214
3215         talloc_set_destructor(state, check_reclock_destructor);
3216
3217         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3218                                     ctdb_check_reclock_timeout, state);
3219         if (state->te == NULL) {
3220                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3221                 talloc_free(state);
3222                 return -1;
3223         }
3224
3225         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3226                                 EVENT_FD_READ,
3227                                 reclock_child_handler,
3228                                 (void *)state);
3229
3230         if (state->fde == NULL) {
3231                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3232                 talloc_free(state);
3233                 return -1;
3234         }
3235         tevent_fd_set_auto_close(state->fde);
3236
3237         while (state->status == RECLOCK_CHECKING) {
3238                 event_loop_once(ctdb->ev);
3239         }
3240
3241         if (state->status == RECLOCK_FAILED) {
3242                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3243                 close(ctdb->recovery_lock_fd);
3244                 ctdb->recovery_lock_fd = -1;
3245                 talloc_free(state);
3246                 return -1;
3247         }
3248
3249         talloc_free(state);
3250         return 0;
3251 }
3252
3253 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3254 {
3255         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3256         const char *reclockfile;
3257
3258         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3259                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3260                 talloc_free(tmp_ctx);
3261                 return -1;      
3262         }
3263
3264         if (reclockfile == NULL) {
3265                 if (ctdb->recovery_lock_file != NULL) {
3266                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3267                         talloc_free(ctdb->recovery_lock_file);
3268                         ctdb->recovery_lock_file = NULL;
3269                         if (ctdb->recovery_lock_fd != -1) {
3270                                 close(ctdb->recovery_lock_fd);
3271                                 ctdb->recovery_lock_fd = -1;
3272                         }
3273                 }
3274                 ctdb->tunable.verify_recovery_lock = 0;
3275                 talloc_free(tmp_ctx);
3276                 return 0;
3277         }
3278
3279         if (ctdb->recovery_lock_file == NULL) {
3280                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3281                 if (ctdb->recovery_lock_fd != -1) {
3282                         close(ctdb->recovery_lock_fd);
3283                         ctdb->recovery_lock_fd = -1;
3284                 }
3285                 talloc_free(tmp_ctx);
3286                 return 0;
3287         }
3288
3289
3290         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3291                 talloc_free(tmp_ctx);
3292                 return 0;
3293         }
3294
3295         talloc_free(ctdb->recovery_lock_file);
3296         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3297         ctdb->tunable.verify_recovery_lock = 0;
3298         if (ctdb->recovery_lock_fd != -1) {
3299                 close(ctdb->recovery_lock_fd);
3300                 ctdb->recovery_lock_fd = -1;
3301         }
3302
3303         talloc_free(tmp_ctx);
3304         return 0;
3305 }
3306
3307 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3308                       TALLOC_CTX *mem_ctx)
3309 {
3310         uint32_t pnn;
3311         struct ctdb_node_map *nodemap=NULL;
3312         struct ctdb_node_map *recmaster_nodemap=NULL;
3313         struct ctdb_node_map **remote_nodemaps=NULL;
3314         struct ctdb_vnn_map *vnnmap=NULL;
3315         struct ctdb_vnn_map *remote_vnnmap=NULL;
3316         int32_t debug_level;
3317         int i, j, ret;
3318
3319
3320
3321         /* verify that the main daemon is still running */
3322         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3323                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3324                 exit(-1);
3325         }
3326
3327         /* ping the local daemon to tell it we are alive */
3328         ctdb_ctrl_recd_ping(ctdb);
3329
3330         if (rec->election_timeout) {
3331                 /* an election is in progress */
3332                 return;
3333         }
3334
3335         /* read the debug level from the parent and update locally */
3336         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3337         if (ret !=0) {
3338                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3339                 return;
3340         }
3341         LogLevel = debug_level;
3342
3343
3344         /* We must check if we need to ban a node here but we want to do this
3345            as early as possible so we dont wait until we have pulled the node
3346            map from the local node. thats why we have the hardcoded value 20
3347         */
3348         for (i=0; i<ctdb->num_nodes; i++) {
3349                 struct ctdb_banning_state *ban_state;
3350
3351                 if (ctdb->nodes[i]->ban_state == NULL) {
3352                         continue;
3353                 }
3354                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3355                 if (ban_state->count < 20) {
3356                         continue;
3357                 }
3358                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3359                         ctdb->nodes[i]->pnn, ban_state->count,
3360                         ctdb->tunable.recovery_ban_period));
3361                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3362                 ban_state->count = 0;
3363         }
3364
3365         /* get relevant tunables */
3366         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3367         if (ret != 0) {
3368                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3369                 return;
3370         }
3371
3372         /* get the current recovery lock file from the server */
3373         if (update_recovery_lock_file(ctdb) != 0) {
3374                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3375                 return;
3376         }
3377
3378         /* Make sure that if recovery lock verification becomes disabled when
3379            we close the file
3380         */
3381         if (ctdb->tunable.verify_recovery_lock == 0) {
3382                 if (ctdb->recovery_lock_fd != -1) {
3383                         close(ctdb->recovery_lock_fd);
3384                         ctdb->recovery_lock_fd = -1;
3385                 }
3386         }
3387
3388         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3389         if (pnn == (uint32_t)-1) {
3390                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3391                 return;
3392         }
3393
3394         /* get the vnnmap */
3395         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3396         if (ret != 0) {
3397                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3398                 return;
3399         }
3400
3401
3402         /* get number of nodes */
3403         if (rec->nodemap) {
3404                 talloc_free(rec->nodemap);
3405                 rec->nodemap = NULL;
3406                 nodemap=NULL;
3407         }
3408         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3409         if (ret != 0) {
3410                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3411                 return;
3412         }
3413         nodemap = rec->nodemap;
3414
3415         /* update the capabilities for all nodes */
3416         ret = update_capabilities(ctdb, nodemap);
3417         if (ret != 0) {
3418                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3419                 return;
3420         }
3421
3422         /* check which node is the recovery master */
3423         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3424         if (ret != 0) {
3425                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3426                 return;
3427         }
3428
3429         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3430         if (rec->recmaster != pnn) {
3431                 if (rec->ip_reallocate_ctx != NULL) {
3432                         talloc_free(rec->ip_reallocate_ctx);
3433                         rec->ip_reallocate_ctx = NULL;
3434                         rec->reallocate_callers = NULL;
3435                 }
3436         }
3437
3438         if (rec->recmaster == (uint32_t)-1) {
3439                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3440                 force_election(rec, pnn, nodemap);
3441                 return;
3442         }
3443
3444         /* if the local daemon is STOPPED, we verify that the databases are
3445            also frozen and thet the recmode is set to active 
3446         */
3447         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3448                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3449                 if (ret != 0) {
3450                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3451                 }
3452                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3453                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3454
3455                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3456                         if (ret != 0) {
3457                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED state\n"));
3458                                 return;
3459                         }
3460                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3461                         if (ret != 0) {
3462                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED state\n"));
3463
3464                                 return;
3465                         }
3466                         return;
3467                 }
3468         }
3469         /* If the local node is stopped, verify we are not the recmaster 
3470            and yield this role if so
3471         */
3472         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
3473                 DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
3474                 force_election(rec, pnn, nodemap);
3475                 return;
3476         }
3477         
3478         /*
3479          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3480          * but we have force an election and try to become the new
3481          * recmaster
3482          */
3483         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3484             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3485              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3486                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3487                                   " but we (node %u) have - force an election\n",
3488                                   rec->recmaster, pnn));
3489                 force_election(rec, pnn, nodemap);
3490                 return;
3491         }
3492
3493         /* check that we (recovery daemon) and the local ctdb daemon
3494            agrees on whether we are banned or not
3495         */
3496
3497         /* remember our own node flags */
3498         rec->node_flags = nodemap->nodes[pnn].flags;
3499
3500         /* count how many active nodes there are */
3501         rec->num_active    = 0;
3502         rec->num_connected = 0;
3503         for (i=0; i<nodemap->num; i++) {
3504                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3505                         rec->num_active++;
3506                 }
3507                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3508                         rec->num_connected++;
3509                 }
3510         }
3511
3512
3513         /* verify that the recmaster node is still active */
3514         for (j=0; j<nodemap->num; j++) {
3515                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3516                         break;
3517                 }
3518         }
3519
3520         if (j == nodemap->num) {
3521                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3522                 force_election(rec, pnn, nodemap);
3523                 return;
3524         }
3525
3526         /* if recovery master is disconnected we must elect a new recmaster */
3527         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3528                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3529                 force_election(rec, pnn, nodemap);
3530                 return;
3531         }
3532
3533         /* get nodemap from the recovery master to check if it is inactive */
3534         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3535                                    mem_ctx, &recmaster_nodemap);
3536         if (ret != 0) {
3537                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3538                           nodemap->nodes[j].pnn));
3539                 return;
3540         }
3541
3542
3543         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3544             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3545                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3546                 force_election(rec, pnn, nodemap);
3547                 return;
3548         }
3549
3550         /* If this node is stopped then it is not the recovery master
3551          * so the only remaining action is to potentially to verify
3552          * the local IP allocation below.  This won't accomplish
3553          * anything useful so skip it.
3554          */
3555         if (rec->node_flags & NODE_FLAGS_STOPPED) {
3556                 return;
3557         }
3558
3559         /* verify that we have all ip addresses we should have and we dont
3560          * have addresses we shouldnt have.
3561          */ 
3562         if (ctdb->tunable.disable_ip_failover == 0) {
3563                 if (rec->ip_check_disable_ctx == NULL) {
3564                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3565                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3566                         }
3567                 }
3568         }
3569
3570
3571         /* if we are not the recmaster then we do not need to check
3572            if recovery is needed
3573          */
3574         if (pnn != rec->recmaster) {
3575                 return;
3576         }
3577
3578
3579         /* ensure our local copies of flags are right */
3580         ret = update_local_flags(rec, nodemap);
3581         if (ret == MONITOR_ELECTION_NEEDED) {
3582                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3583                 force_election(rec, pnn, nodemap);
3584                 return;
3585         }
3586         if (ret != MONITOR_OK) {
3587                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3588                 return;
3589         }
3590
3591         if (ctdb->num_nodes != nodemap->num) {
3592                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3593                 reload_nodes_file(ctdb);
3594                 return;
3595         }
3596
3597         /* verify that all active nodes agree that we are the recmaster */
3598         switch (verify_recmaster(rec, nodemap, pnn)) {
3599         case MONITOR_RECOVERY_NEEDED:
3600                 /* can not happen */
3601                 return;
3602         case MONITOR_ELECTION_NEEDED:
3603                 force_election(rec, pnn, nodemap);
3604                 return;
3605         case MONITOR_OK:
3606                 break;
3607         case MONITOR_FAILED:
3608                 return;
3609         }
3610
3611
3612         if (rec->need_recovery) {
3613                 /* a previous recovery didn't finish */
3614                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3615                 return;
3616         }
3617
3618         /* verify that all active nodes are in normal mode 
3619            and not in recovery mode 
3620         */
3621         switch (verify_recmode(ctdb, nodemap)) {
3622         case MONITOR_RECOVERY_NEEDED:
3623                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3624                 return;
3625         case MONITOR_FAILED:
3626                 return;
3627         case MONITOR_ELECTION_NEEDED:
3628                 /* can not happen */
3629         case MONITOR_OK:
3630                 break;
3631         }
3632
3633
3634         if (ctdb->tunable.verify_recovery_lock != 0) {
3635                 /* we should have the reclock - check its not stale */
3636                 ret = check_recovery_lock(ctdb);
3637                 if (ret != 0) {
3638                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3639                         ctdb_set_culprit(rec, ctdb->pnn);
3640                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3641                         return;
3642                 }
3643         }
3644
3645
3646         /* is there a pending reload all ips ? */
3647         if (reload_all_ips_request != NULL) {
3648                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3649                 talloc_free(reload_all_ips_request);
3650                 reload_all_ips_request = NULL;
3651         }
3652
3653         /* if there are takeovers requested, perform it and notify the waiters */
3654         if (rec->reallocate_callers) {
3655                 process_ipreallocate_requests(ctdb, rec);
3656         }
3657
3658         /* get the nodemap for all active remote nodes
3659          */
3660         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3661         if (remote_nodemaps == NULL) {
3662                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3663                 return;
3664         }
3665         for(i=0; i<nodemap->num; i++) {
3666                 remote_nodemaps[i] = NULL;
3667         }
3668         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3669                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3670                 return;
3671         } 
3672
3673         /* verify that all other nodes have the same nodemap as we have
3674         */
3675         for (j=0; j<nodemap->num; j++) {
3676                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3677                         continue;
3678                 }
3679
3680                 if (remote_nodemaps[j] == NULL) {
3681                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3682                         ctdb_set_culprit(rec, j);
3683
3684                         return;
3685                 }
3686
3687                 /* if the nodes disagree on how many nodes there are
3688                    then this is a good reason to try recovery
3689                  */
3690                 if (remote_nodemaps[j]->num != nodemap->num) {
3691                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3692                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3693                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3694                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3695                         return;
3696                 }
3697
3698                 /* if the nodes disagree on which nodes exist and are
3699                    active, then that is also a good reason to do recovery
3700                  */
3701                 for (i=0;i<nodemap->num;i++) {
3702                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3703                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3704                                           nodemap->nodes[j].pnn, i, 
3705                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3706                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3707                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3708                                             vnnmap);
3709                                 return;
3710                         }
3711                 }
3712
3713                 /* verify the flags are consistent
3714                 */
3715                 for (i=0; i<nodemap->num; i++) {
3716                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3717                                 continue;
3718                         }
3719                         
3720                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3721                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3722                                   nodemap->nodes[j].pnn, 
3723                                   nodemap->nodes[i].pnn, 
3724                                   remote_nodemaps[j]->nodes[i].flags,
3725                                   nodemap->nodes[i].flags));
3726                                 if (i == j) {
3727                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3728                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3729                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3730                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3731                                                     vnnmap);
3732                                         return;
3733                                 } else {
3734                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3735                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3736                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3737                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3738                                                     vnnmap);
3739                                         return;
3740                                 }
3741                         }
3742                 }
3743         }
3744
3745
3746         /* there better be the same number of lmasters in the vnn map
3747            as there are active nodes or we will have to do a recovery
3748          */
3749         if (vnnmap->size != rec->num_active) {
3750                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3751                           vnnmap->size, rec->num_active));
3752                 ctdb_set_culprit(rec, ctdb->pnn);
3753                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3754                 return;
3755         }
3756
3757         /* verify that all active nodes in the nodemap also exist in 
3758            the vnnmap.
3759          */
3760         for (j=0; j<nodemap->num; j++) {
3761                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3762                         continue;
3763                 }
3764                 if (nodemap->nodes[j].pnn == pnn) {
3765                         continue;
3766                 }
3767
3768                 for (i=0; i<vnnmap->size; i++) {
3769                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3770                                 break;
3771                         }
3772                 }
3773                 if (i == vnnmap->size) {
3774                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3775                                   nodemap->nodes[j].pnn));
3776                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3777                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3778                         return;
3779                 }
3780         }
3781
3782         
3783         /* verify that all other nodes have the same vnnmap
3784            and are from the same generation
3785          */
3786         for (j=0; j<nodemap->num; j++) {
3787                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3788                         continue;
3789                 }
3790                 if (nodemap->nodes[j].pnn == pnn) {
3791                         continue;
3792                 }
3793
3794                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3795                                           mem_ctx, &remote_vnnmap);
3796                 if (ret != 0) {
3797                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3798                                   nodemap->nodes[j].pnn));
3799                         return;
3800                 }
3801
3802                 /* verify the vnnmap generation is the same */
3803                 if (vnnmap->generation != remote_vnnmap->generation) {
3804                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3805                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3806                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3807                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3808                         return;
3809                 }
3810
3811                 /* verify the vnnmap size is the same */
3812                 if (vnnmap->size != remote_vnnmap->size) {
3813                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3814                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3815                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3816                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3817                         return;
3818                 }
3819
3820                 /* verify the vnnmap is the same */
3821                 for (i=0;i<vnnmap->size;i++) {
3822                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3823                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3824                                           nodemap->nodes[j].pnn));
3825                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3826                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3827                                             vnnmap);
3828                                 return;
3829                         }
3830                 }
3831         }
3832
3833         /* we might need to change who has what IP assigned */
3834         if (rec->need_takeover_run) {
3835                 uint32_t culprit = (uint32_t)-1;
3836
3837                 rec->need_takeover_run = false;
3838
3839                 /* update the list of public ips that a node can handle for
3840                    all connected nodes
3841                 */
3842                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3843                 if (ret != 0) {
3844                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3845                                          culprit));
3846                         rec->need_takeover_run = true;
3847                         return;
3848                 }
3849
3850                 /* execute the "startrecovery" event script on all nodes */
3851                 ret = run_startrecovery_eventscript(rec, nodemap);
3852                 if (ret!=0) {
3853                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3854                         ctdb_set_culprit(rec, ctdb->pnn);
3855                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3856                         return;
3857                 }
3858
3859                 /* If takeover run fails, then the offending nodes are
3860                  * assigned ban culprit counts. And we re-try takeover.
3861                  * If takeover run fails repeatedly, the node would get
3862                  * banned.
3863                  *
3864                  * If rec->need_takeover_run is not set to true at this
3865                  * failure, monitoring is disabled cluster-wide (via
3866                  * startrecovery eventscript) and will not get enabled.
3867                  */
3868                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3869                 if (ret != 0) {
3870                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3871                         return;
3872                 }
3873
3874                 /* execute the "recovered" event script on all nodes */
3875                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3876 #if 0
3877 // we cant check whether the event completed successfully
3878 // since this script WILL fail if the node is in recovery mode
3879 // and if that race happens, the code here would just cause a second
3880 // cascading recovery.
3881                 if (ret!=0) {
3882                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3883                         ctdb_set_culprit(rec, ctdb->pnn);
3884                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3885                 }
3886 #endif
3887         }
3888 }
3889
3890 /*
3891   the main monitoring loop
3892  */
3893 static void monitor_cluster(struct ctdb_context *ctdb)
3894 {
3895         struct ctdb_recoverd *rec;
3896
3897         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3898
3899         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3900         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3901
3902         rec->ctdb = ctdb;
3903
3904         rec->priority_time = timeval_current();
3905
3906         /* register a message port for sending memory dumps */
3907         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3908
3909         /* register a message port for requesting logs */
3910         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3911
3912         /* register a message port for clearing logs */
3913         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3914
3915         /* register a message port for recovery elections */
3916         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3917
3918         /* when nodes are disabled/enabled */
3919         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3920
3921         /* when we are asked to puch out a flag change */
3922         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3923
3924         /* register a message port for vacuum fetch */
3925         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3926
3927         /* register a message port for reloadnodes  */
3928         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3929
3930         /* register a message port for performing a takeover run */
3931         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3932
3933         /* register a message port for performing a reload all ips */
3934         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3935
3936         /* register a message port for disabling the ip check for a short while */
3937         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3938
3939         /* register a message port for updating the recovery daemons node assignment for an ip */
3940         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3941
3942         /* register a message port for forcing a rebalance of a node next
3943            reallocation */
3944         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3945
3946         for (;;) {
3947                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3948                 struct timeval start;
3949                 double elapsed;
3950
3951                 if (!mem_ctx) {
3952                         DEBUG(DEBUG_CRIT,(__location__
3953                                           " Failed to create temp context\n"));
3954                         exit(-1);
3955                 }
3956
3957                 start = timeval_current();
3958                 main_loop(ctdb, rec, mem_ctx);
3959                 talloc_free(mem_ctx);
3960
3961                 /* we only check for recovery once every second */
3962                 elapsed = timeval_elapsed(&start);
3963                 if (elapsed < ctdb->tunable.recover_interval) {
3964                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3965                                           - elapsed);
3966                 }
3967         }
3968 }
3969
3970 /*
3971   event handler for when the main ctdbd dies
3972  */
3973 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3974                                  uint16_t flags, void *private_data)
3975 {
3976         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3977         _exit(1);
3978 }
3979
3980 /*
3981   called regularly to verify that the recovery daemon is still running
3982  */
3983 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3984                               struct timeval yt, void *p)
3985 {
3986         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3987
3988         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3989                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3990
3991                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3992                                 ctdb_restart_recd, ctdb);
3993
3994                 return;
3995         }
3996
3997         event_add_timed(ctdb->ev, ctdb->recd_ctx,
3998                         timeval_current_ofs(30, 0),
3999                         ctdb_check_recd, ctdb);
4000 }
4001
4002 static void recd_sig_child_handler(struct event_context *ev,
4003         struct signal_event *se, int signum, int count,
4004         void *dont_care, 
4005         void *private_data)
4006 {
4007 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4008         int status;
4009         pid_t pid = -1;
4010
4011         while (pid != 0) {
4012                 pid = waitpid(-1, &status, WNOHANG);
4013                 if (pid == -1) {
4014                         if (errno != ECHILD) {
4015                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4016                         }
4017                         return;
4018                 }
4019                 if (pid > 0) {
4020                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4021                 }
4022         }
4023 }
4024
4025 /*
4026   startup the recovery daemon as a child of the main ctdb daemon
4027  */
4028 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4029 {
4030         int fd[2];
4031         struct signal_event *se;
4032         struct tevent_fd *fde;
4033
4034         if (pipe(fd) != 0) {
4035                 return -1;
4036         }
4037
4038         ctdb->ctdbd_pid = getpid();
4039
4040         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4041         if (ctdb->recoverd_pid == -1) {
4042                 return -1;
4043         }
4044
4045         if (ctdb->recoverd_pid != 0) {
4046                 talloc_free(ctdb->recd_ctx);
4047                 ctdb->recd_ctx = talloc_new(ctdb);
4048                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4049
4050                 close(fd[0]);
4051                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4052                                 timeval_current_ofs(30, 0),
4053                                 ctdb_check_recd, ctdb);
4054                 return 0;
4055         }
4056
4057         close(fd[1]);
4058
4059         srandom(getpid() ^ time(NULL));
4060
4061         /* Clear the log ringbuffer */
4062         ctdb_clear_log(ctdb);
4063
4064         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4065                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4066                 exit(1);
4067         }
4068
4069         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4070
4071         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4072                      ctdb_recoverd_parent, &fd[0]);
4073         tevent_fd_set_auto_close(fde);
4074
4075         /* set up a handler to pick up sigchld */
4076         se = event_add_signal(ctdb->ev, ctdb,
4077                                      SIGCHLD, 0,
4078                                      recd_sig_child_handler,
4079                                      ctdb);
4080         if (se == NULL) {
4081                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4082                 exit(1);
4083         }
4084
4085         monitor_cluster(ctdb);
4086
4087         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4088         return -1;
4089 }
4090
4091 /*
4092   shutdown the recovery daemon
4093  */
4094 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4095 {
4096         if (ctdb->recoverd_pid == 0) {
4097                 return;
4098         }
4099
4100         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4101         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4102
4103         TALLOC_FREE(ctdb->recd_ctx);
4104         TALLOC_FREE(ctdb->recd_ping_count);
4105 }
4106
4107 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4108                        struct timeval t, void *private_data)
4109 {
4110         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4111
4112         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4113         ctdb_stop_recoverd(ctdb);
4114         ctdb_start_recoverd(ctdb);
4115 }