recoverd: Set node_flags information as soon as we get nodemap
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         if (!ctdb_validate_pnn(ctdb, pnn)) {
90                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
91                 return;
92         }
93
94         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         if (ctdb->nodes[culprit]->ban_state == NULL) {
124                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
125                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
126
127                 
128         }
129         ban_state = ctdb->nodes[culprit]->ban_state;
130         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
131                 /* this was the first time in a long while this node
132                    misbehaved so we will forgive any old transgressions.
133                 */
134                 ban_state->count = 0;
135         }
136
137         ban_state->count += count;
138         ban_state->last_reported_time = timeval_current();
139         rec->last_culprit_node = culprit;
140 }
141
142 /*
143   remember the trouble maker
144  */
145 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
146 {
147         ctdb_set_culprit_count(rec, culprit, 1);
148 }
149
150
151 /* this callback is called for every node that failed to execute the
152    recovered event
153 */
154 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
155 {
156         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
157
158         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
159
160         ctdb_set_culprit(rec, node_pnn);
161 }
162
163 /*
164   run the "recovered" eventscript on all nodes
165  */
166 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
167 {
168         TALLOC_CTX *tmp_ctx;
169         uint32_t *nodes;
170         struct ctdb_context *ctdb = rec->ctdb;
171
172         tmp_ctx = talloc_new(ctdb);
173         CTDB_NO_MEMORY(ctdb, tmp_ctx);
174
175         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
176         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
177                                         nodes, 0,
178                                         CONTROL_TIMEOUT(), false, tdb_null,
179                                         NULL, recovered_fail_callback,
180                                         rec) != 0) {
181                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
182
183                 talloc_free(tmp_ctx);
184                 return -1;
185         }
186
187         talloc_free(tmp_ctx);
188         return 0;
189 }
190
191 /* this callback is called for every node that failed to execute the
192    start recovery event
193 */
194 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
195 {
196         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
197
198         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
199
200         ctdb_set_culprit(rec, node_pnn);
201 }
202
203 /*
204   run the "startrecovery" eventscript on all nodes
205  */
206 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
207 {
208         TALLOC_CTX *tmp_ctx;
209         uint32_t *nodes;
210         struct ctdb_context *ctdb = rec->ctdb;
211
212         tmp_ctx = talloc_new(ctdb);
213         CTDB_NO_MEMORY(ctdb, tmp_ctx);
214
215         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
216         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
217                                         nodes, 0,
218                                         CONTROL_TIMEOUT(), false, tdb_null,
219                                         NULL,
220                                         startrecovery_fail_callback,
221                                         rec) != 0) {
222                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
223                 talloc_free(tmp_ctx);
224                 return -1;
225         }
226
227         talloc_free(tmp_ctx);
228         return 0;
229 }
230
231 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
232 {
233         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
234                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
235                 return;
236         }
237         if (node_pnn < ctdb->num_nodes) {
238                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
239         }
240
241         if (node_pnn == ctdb->pnn) {
242                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
243         }
244 }
245
246 /*
247   update the node capabilities for all connected nodes
248  */
249 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
250 {
251         uint32_t *nodes;
252         TALLOC_CTX *tmp_ctx;
253
254         tmp_ctx = talloc_new(ctdb);
255         CTDB_NO_MEMORY(ctdb, tmp_ctx);
256
257         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
258         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
259                                         nodes, 0,
260                                         CONTROL_TIMEOUT(),
261                                         false, tdb_null,
262                                         async_getcap_callback, NULL,
263                                         NULL) != 0) {
264                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
265                 talloc_free(tmp_ctx);
266                 return -1;
267         }
268
269         talloc_free(tmp_ctx);
270         return 0;
271 }
272
273 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
274 {
275         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
276
277         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
278         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
279 }
280
281 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
282 {
283         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
284
285         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
286         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
287 }
288
289 /*
290   change recovery mode on all nodes
291  */
292 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
293 {
294         TDB_DATA data;
295         uint32_t *nodes;
296         TALLOC_CTX *tmp_ctx;
297
298         tmp_ctx = talloc_new(ctdb);
299         CTDB_NO_MEMORY(ctdb, tmp_ctx);
300
301         /* freeze all nodes */
302         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
303         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
304                 int i;
305
306                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
307                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
308                                                 nodes, i,
309                                                 CONTROL_TIMEOUT(),
310                                                 false, tdb_null,
311                                                 NULL,
312                                                 set_recmode_fail_callback,
313                                                 rec) != 0) {
314                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
315                                 talloc_free(tmp_ctx);
316                                 return -1;
317                         }
318                 }
319         }
320
321
322         data.dsize = sizeof(uint32_t);
323         data.dptr = (unsigned char *)&rec_mode;
324
325         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
326                                         nodes, 0,
327                                         CONTROL_TIMEOUT(),
328                                         false, data,
329                                         NULL, NULL,
330                                         NULL) != 0) {
331                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
332                 talloc_free(tmp_ctx);
333                 return -1;
334         }
335
336         talloc_free(tmp_ctx);
337         return 0;
338 }
339
340 /*
341   change recovery master on all node
342  */
343 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
344 {
345         TDB_DATA data;
346         TALLOC_CTX *tmp_ctx;
347         uint32_t *nodes;
348
349         tmp_ctx = talloc_new(ctdb);
350         CTDB_NO_MEMORY(ctdb, tmp_ctx);
351
352         data.dsize = sizeof(uint32_t);
353         data.dptr = (unsigned char *)&pnn;
354
355         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
356         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
357                                         nodes, 0,
358                                         CONTROL_TIMEOUT(), false, data,
359                                         NULL, NULL,
360                                         NULL) != 0) {
361                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
362                 talloc_free(tmp_ctx);
363                 return -1;
364         }
365
366         talloc_free(tmp_ctx);
367         return 0;
368 }
369
370 /* update all remote nodes to use the same db priority that we have
371    this can fail if the remove node has not yet been upgraded to 
372    support this function, so we always return success and never fail
373    a recovery if this call fails.
374 */
375 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
376         struct ctdb_node_map *nodemap, 
377         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
378 {
379         int db;
380         uint32_t *nodes;
381
382         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
383
384         /* step through all local databases */
385         for (db=0; db<dbmap->num;db++) {
386                 TDB_DATA data;
387                 struct ctdb_db_priority db_prio;
388                 int ret;
389
390                 db_prio.db_id     = dbmap->dbs[db].dbid;
391                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
392                 if (ret != 0) {
393                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
394                         continue;
395                 }
396
397                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
398
399                 data.dptr  = (uint8_t *)&db_prio;
400                 data.dsize = sizeof(db_prio);
401
402                 if (ctdb_client_async_control(ctdb,
403                                         CTDB_CONTROL_SET_DB_PRIORITY,
404                                         nodes, 0,
405                                         CONTROL_TIMEOUT(), false, data,
406                                         NULL, NULL,
407                                         NULL) != 0) {
408                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
409                 }
410         }
411
412         return 0;
413 }                       
414
415 /*
416   ensure all other nodes have attached to any databases that we have
417  */
418 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
419                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
420 {
421         int i, j, db, ret;
422         struct ctdb_dbid_map *remote_dbmap;
423
424         /* verify that all other nodes have all our databases */
425         for (j=0; j<nodemap->num; j++) {
426                 /* we dont need to ourself ourselves */
427                 if (nodemap->nodes[j].pnn == pnn) {
428                         continue;
429                 }
430                 /* dont check nodes that are unavailable */
431                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
432                         continue;
433                 }
434
435                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
436                                          mem_ctx, &remote_dbmap);
437                 if (ret != 0) {
438                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
439                         return -1;
440                 }
441
442                 /* step through all local databases */
443                 for (db=0; db<dbmap->num;db++) {
444                         const char *name;
445
446
447                         for (i=0;i<remote_dbmap->num;i++) {
448                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
449                                         break;
450                                 }
451                         }
452                         /* the remote node already have this database */
453                         if (i!=remote_dbmap->num) {
454                                 continue;
455                         }
456                         /* ok so we need to create this database */
457                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
458                                             mem_ctx, &name);
459                         if (ret != 0) {
460                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
461                                 return -1;
462                         }
463                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
464                                            mem_ctx, name,
465                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
466                         if (ret != 0) {
467                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
468                                 return -1;
469                         }
470                 }
471         }
472
473         return 0;
474 }
475
476
477 /*
478   ensure we are attached to any databases that anyone else is attached to
479  */
480 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
481                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
482 {
483         int i, j, db, ret;
484         struct ctdb_dbid_map *remote_dbmap;
485
486         /* verify that we have all database any other node has */
487         for (j=0; j<nodemap->num; j++) {
488                 /* we dont need to ourself ourselves */
489                 if (nodemap->nodes[j].pnn == pnn) {
490                         continue;
491                 }
492                 /* dont check nodes that are unavailable */
493                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
494                         continue;
495                 }
496
497                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                          mem_ctx, &remote_dbmap);
499                 if (ret != 0) {
500                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
501                         return -1;
502                 }
503
504                 /* step through all databases on the remote node */
505                 for (db=0; db<remote_dbmap->num;db++) {
506                         const char *name;
507
508                         for (i=0;i<(*dbmap)->num;i++) {
509                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
510                                         break;
511                                 }
512                         }
513                         /* we already have this db locally */
514                         if (i!=(*dbmap)->num) {
515                                 continue;
516                         }
517                         /* ok so we need to create this database and
518                            rebuild dbmap
519                          */
520                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
521                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
522                         if (ret != 0) {
523                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
524                                           nodemap->nodes[j].pnn));
525                                 return -1;
526                         }
527                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
528                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
529                         if (ret != 0) {
530                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
531                                 return -1;
532                         }
533                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
534                         if (ret != 0) {
535                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
536                                 return -1;
537                         }
538                 }
539         }
540
541         return 0;
542 }
543
544
545 /*
546   pull the remote database contents from one node into the recdb
547  */
548 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
549                                     struct tdb_wrap *recdb, uint32_t dbid)
550 {
551         int ret;
552         TDB_DATA outdata;
553         struct ctdb_marshall_buffer *reply;
554         struct ctdb_rec_data *rec;
555         int i;
556         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
557
558         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
559                                CONTROL_TIMEOUT(), &outdata);
560         if (ret != 0) {
561                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
562                 talloc_free(tmp_ctx);
563                 return -1;
564         }
565
566         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
567
568         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
569                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
570                 talloc_free(tmp_ctx);
571                 return -1;
572         }
573         
574         rec = (struct ctdb_rec_data *)&reply->data[0];
575         
576         for (i=0;
577              i<reply->count;
578              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
579                 TDB_DATA key, data;
580                 struct ctdb_ltdb_header *hdr;
581                 TDB_DATA existing;
582                 
583                 key.dptr = &rec->data[0];
584                 key.dsize = rec->keylen;
585                 data.dptr = &rec->data[key.dsize];
586                 data.dsize = rec->datalen;
587                 
588                 hdr = (struct ctdb_ltdb_header *)data.dptr;
589
590                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
591                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
592                         talloc_free(tmp_ctx);
593                         return -1;
594                 }
595
596                 /* fetch the existing record, if any */
597                 existing = tdb_fetch(recdb->tdb, key);
598                 
599                 if (existing.dptr != NULL) {
600                         struct ctdb_ltdb_header header;
601                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
602                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
603                                          (unsigned)existing.dsize, srcnode));
604                                 free(existing.dptr);
605                                 talloc_free(tmp_ctx);
606                                 return -1;
607                         }
608                         header = *(struct ctdb_ltdb_header *)existing.dptr;
609                         free(existing.dptr);
610                         if (!(header.rsn < hdr->rsn ||
611                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
612                                 continue;
613                         }
614                 }
615                 
616                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
617                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
618                         talloc_free(tmp_ctx);
619                         return -1;                              
620                 }
621         }
622
623         talloc_free(tmp_ctx);
624
625         return 0;
626 }
627
628
629 struct pull_seqnum_cbdata {
630         int failed;
631         uint32_t pnn;
632         uint64_t seqnum;
633 };
634
635 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
636 {
637         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
638         uint64_t seqnum;
639
640         if (cb_data->failed != 0) {
641                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
642                 return;
643         }
644
645         if (res != 0) {
646                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
647                 cb_data->failed = 1;
648                 return;
649         }
650
651         if (outdata.dsize != sizeof(uint64_t)) {
652                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
653                 cb_data->failed = -1;
654                 return;
655         }
656
657         seqnum = *((uint64_t *)outdata.dptr);
658
659         if (seqnum > cb_data->seqnum) {
660                 cb_data->seqnum = seqnum;
661                 cb_data->pnn = node_pnn;
662         }
663 }
664
665 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
666 {
667         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
668
669         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
670         cb_data->failed = 1;
671 }
672
673 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
674                                 struct ctdb_recoverd *rec, 
675                                 struct ctdb_node_map *nodemap, 
676                                 struct tdb_wrap *recdb, uint32_t dbid)
677 {
678         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
679         uint32_t *nodes;
680         TDB_DATA data;
681         uint32_t outdata[2];
682         struct pull_seqnum_cbdata *cb_data;
683
684         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
685
686         outdata[0] = dbid;
687         outdata[1] = 0;
688
689         data.dsize = sizeof(outdata);
690         data.dptr  = (uint8_t *)&outdata[0];
691
692         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
693         if (cb_data == NULL) {
694                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
695                 talloc_free(tmp_ctx);
696                 return -1;
697         }
698
699         cb_data->failed = 0;
700         cb_data->pnn    = -1;
701         cb_data->seqnum = 0;
702         
703         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
704         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
705                                         nodes, 0,
706                                         CONTROL_TIMEOUT(), false, data,
707                                         pull_seqnum_cb,
708                                         pull_seqnum_fail_cb,
709                                         cb_data) != 0) {
710                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
711
712                 talloc_free(tmp_ctx);
713                 return -1;
714         }
715
716         if (cb_data->failed != 0) {
717                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
729
730         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
731                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
732                 talloc_free(tmp_ctx);
733                 return -1;
734         }
735
736         talloc_free(tmp_ctx);
737         return 0;
738 }
739
740
741 /*
742   pull all the remote database contents into the recdb
743  */
744 static int pull_remote_database(struct ctdb_context *ctdb,
745                                 struct ctdb_recoverd *rec, 
746                                 struct ctdb_node_map *nodemap, 
747                                 struct tdb_wrap *recdb, uint32_t dbid,
748                                 bool persistent)
749 {
750         int j;
751
752         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
753                 int ret;
754                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
755                 if (ret == 0) {
756                         return 0;
757                 }
758         }
759
760         /* pull all records from all other nodes across onto this node
761            (this merges based on rsn)
762         */
763         for (j=0; j<nodemap->num; j++) {
764                 /* dont merge from nodes that are unavailable */
765                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
766                         continue;
767                 }
768                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
769                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
770                                  nodemap->nodes[j].pnn));
771                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
772                         return -1;
773                 }
774         }
775         
776         return 0;
777 }
778
779
780 /*
781   update flags on all active nodes
782  */
783 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
784 {
785         int ret;
786
787         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
788                 if (ret != 0) {
789                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
790                 return -1;
791         }
792
793         return 0;
794 }
795
796 /*
797   ensure all nodes have the same vnnmap we do
798  */
799 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
800                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
801 {
802         int j, ret;
803
804         /* push the new vnn map out to all the nodes */
805         for (j=0; j<nodemap->num; j++) {
806                 /* dont push to nodes that are unavailable */
807                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
808                         continue;
809                 }
810
811                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
812                 if (ret != 0) {
813                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
814                         return -1;
815                 }
816         }
817
818         return 0;
819 }
820
821
822 struct vacuum_info {
823         struct vacuum_info *next, *prev;
824         struct ctdb_recoverd *rec;
825         uint32_t srcnode;
826         struct ctdb_db_context *ctdb_db;
827         struct ctdb_marshall_buffer *recs;
828         struct ctdb_rec_data *r;
829 };
830
831 static void vacuum_fetch_next(struct vacuum_info *v);
832
833 /*
834   called when a vacuum fetch has completed - just free it and do the next one
835  */
836 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
837 {
838         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
839         talloc_free(state);
840         vacuum_fetch_next(v);
841 }
842
843
844 /*
845   process the next element from the vacuum list
846 */
847 static void vacuum_fetch_next(struct vacuum_info *v)
848 {
849         struct ctdb_call call;
850         struct ctdb_rec_data *r;
851
852         while (v->recs->count) {
853                 struct ctdb_client_call_state *state;
854                 TDB_DATA data;
855                 struct ctdb_ltdb_header *hdr;
856
857                 ZERO_STRUCT(call);
858                 call.call_id = CTDB_NULL_FUNC;
859                 call.flags = CTDB_IMMEDIATE_MIGRATION;
860                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
861
862                 r = v->r;
863                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
864                 v->recs->count--;
865
866                 call.key.dptr = &r->data[0];
867                 call.key.dsize = r->keylen;
868
869                 /* ensure we don't block this daemon - just skip a record if we can't get
870                    the chainlock */
871                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
872                         continue;
873                 }
874
875                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
876                 if (data.dptr == NULL) {
877                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
878                         continue;
879                 }
880
881                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
882                         free(data.dptr);
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886                 
887                 hdr = (struct ctdb_ltdb_header *)data.dptr;
888                 if (hdr->dmaster == v->rec->ctdb->pnn) {
889                         /* its already local */
890                         free(data.dptr);
891                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
892                         continue;
893                 }
894
895                 free(data.dptr);
896
897                 state = ctdb_call_send(v->ctdb_db, &call);
898                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
899                 if (state == NULL) {
900                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
901                         talloc_free(v);
902                         return;
903                 }
904                 state->async.fn = vacuum_fetch_callback;
905                 state->async.private_data = v;
906                 return;
907         }
908
909         talloc_free(v);
910 }
911
912
913 /*
914   destroy a vacuum info structure
915  */
916 static int vacuum_info_destructor(struct vacuum_info *v)
917 {
918         DLIST_REMOVE(v->rec->vacuum_info, v);
919         return 0;
920 }
921
922
923 /*
924   handler for vacuum fetch
925 */
926 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
927                                  TDB_DATA data, void *private_data)
928 {
929         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
930         struct ctdb_marshall_buffer *recs;
931         int ret, i;
932         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
933         const char *name;
934         struct ctdb_dbid_map *dbmap=NULL;
935         bool persistent = false;
936         struct ctdb_db_context *ctdb_db;
937         struct ctdb_rec_data *r;
938         uint32_t srcnode;
939         struct vacuum_info *v;
940
941         recs = (struct ctdb_marshall_buffer *)data.dptr;
942         r = (struct ctdb_rec_data *)&recs->data[0];
943
944         if (recs->count == 0) {
945                 talloc_free(tmp_ctx);
946                 return;
947         }
948
949         srcnode = r->reqid;
950
951         for (v=rec->vacuum_info;v;v=v->next) {
952                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
953                         /* we're already working on records from this node */
954                         talloc_free(tmp_ctx);
955                         return;
956                 }
957         }
958
959         /* work out if the database is persistent */
960         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
961         if (ret != 0) {
962                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
963                 talloc_free(tmp_ctx);
964                 return;
965         }
966
967         for (i=0;i<dbmap->num;i++) {
968                 if (dbmap->dbs[i].dbid == recs->db_id) {
969                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
970                         break;
971                 }
972         }
973         if (i == dbmap->num) {
974                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
975                 talloc_free(tmp_ctx);
976                 return;         
977         }
978
979         /* find the name of this database */
980         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
981                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
982                 talloc_free(tmp_ctx);
983                 return;
984         }
985
986         /* attach to it */
987         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
988         if (ctdb_db == NULL) {
989                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
990                 talloc_free(tmp_ctx);
991                 return;
992         }
993
994         v = talloc_zero(rec, struct vacuum_info);
995         if (v == NULL) {
996                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
997                 talloc_free(tmp_ctx);
998                 return;
999         }
1000
1001         v->rec = rec;
1002         v->srcnode = srcnode;
1003         v->ctdb_db = ctdb_db;
1004         v->recs = talloc_memdup(v, recs, data.dsize);
1005         if (v->recs == NULL) {
1006                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1007                 talloc_free(v);
1008                 talloc_free(tmp_ctx);
1009                 return;         
1010         }
1011         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1012
1013         DLIST_ADD(rec->vacuum_info, v);
1014
1015         talloc_set_destructor(v, vacuum_info_destructor);
1016
1017         vacuum_fetch_next(v);
1018         talloc_free(tmp_ctx);
1019 }
1020
1021
1022 /*
1023   called when ctdb_wait_timeout should finish
1024  */
1025 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1026                               struct timeval yt, void *p)
1027 {
1028         uint32_t *timed_out = (uint32_t *)p;
1029         (*timed_out) = 1;
1030 }
1031
1032 /*
1033   wait for a given number of seconds
1034  */
1035 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1036 {
1037         uint32_t timed_out = 0;
1038         time_t usecs = (secs - (time_t)secs) * 1000000;
1039         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1040         while (!timed_out) {
1041                 event_loop_once(ctdb->ev);
1042         }
1043 }
1044
1045 /*
1046   called when an election times out (ends)
1047  */
1048 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1049                                   struct timeval t, void *p)
1050 {
1051         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1052         rec->election_timeout = NULL;
1053         fast_start = false;
1054
1055         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1056 }
1057
1058
1059 /*
1060   wait for an election to finish. It finished election_timeout seconds after
1061   the last election packet is received
1062  */
1063 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1064 {
1065         struct ctdb_context *ctdb = rec->ctdb;
1066         while (rec->election_timeout) {
1067                 event_loop_once(ctdb->ev);
1068         }
1069 }
1070
1071 /*
1072   Update our local flags from all remote connected nodes. 
1073   This is only run when we are or we belive we are the recovery master
1074  */
1075 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1076 {
1077         int j;
1078         struct ctdb_context *ctdb = rec->ctdb;
1079         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1080
1081         /* get the nodemap for all active remote nodes and verify
1082            they are the same as for this node
1083          */
1084         for (j=0; j<nodemap->num; j++) {
1085                 struct ctdb_node_map *remote_nodemap=NULL;
1086                 int ret;
1087
1088                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1089                         continue;
1090                 }
1091                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1092                         continue;
1093                 }
1094
1095                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1096                                            mem_ctx, &remote_nodemap);
1097                 if (ret != 0) {
1098                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1099                                   nodemap->nodes[j].pnn));
1100                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1101                         talloc_free(mem_ctx);
1102                         return MONITOR_FAILED;
1103                 }
1104                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1105                         /* We should tell our daemon about this so it
1106                            updates its flags or else we will log the same 
1107                            message again in the next iteration of recovery.
1108                            Since we are the recovery master we can just as
1109                            well update the flags on all nodes.
1110                         */
1111                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1112                         if (ret != 0) {
1113                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1114                                 return -1;
1115                         }
1116
1117                         /* Update our local copy of the flags in the recovery
1118                            daemon.
1119                         */
1120                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1121                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1122                                  nodemap->nodes[j].flags));
1123                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1124                 }
1125                 talloc_free(remote_nodemap);
1126         }
1127         talloc_free(mem_ctx);
1128         return MONITOR_OK;
1129 }
1130
1131
1132 /* Create a new random generation ip. 
1133    The generation id can not be the INVALID_GENERATION id
1134 */
1135 static uint32_t new_generation(void)
1136 {
1137         uint32_t generation;
1138
1139         while (1) {
1140                 generation = random();
1141
1142                 if (generation != INVALID_GENERATION) {
1143                         break;
1144                 }
1145         }
1146
1147         return generation;
1148 }
1149
1150
1151 /*
1152   create a temporary working database
1153  */
1154 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1155 {
1156         char *name;
1157         struct tdb_wrap *recdb;
1158         unsigned tdb_flags;
1159
1160         /* open up the temporary recovery database */
1161         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1162                                ctdb->db_directory_state,
1163                                ctdb->pnn);
1164         if (name == NULL) {
1165                 return NULL;
1166         }
1167         unlink(name);
1168
1169         tdb_flags = TDB_NOLOCK;
1170         if (ctdb->valgrinding) {
1171                 tdb_flags |= TDB_NOMMAP;
1172         }
1173         tdb_flags |= TDB_DISALLOW_NESTING;
1174
1175         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1176                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1177         if (recdb == NULL) {
1178                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1179         }
1180
1181         talloc_free(name);
1182
1183         return recdb;
1184 }
1185
1186
1187 /* 
1188    a traverse function for pulling all relevant records from recdb
1189  */
1190 struct recdb_data {
1191         struct ctdb_context *ctdb;
1192         struct ctdb_marshall_buffer *recdata;
1193         uint32_t len;
1194         uint32_t allocated_len;
1195         bool failed;
1196         bool persistent;
1197 };
1198
1199 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1200 {
1201         struct recdb_data *params = (struct recdb_data *)p;
1202         struct ctdb_rec_data *rec;
1203         struct ctdb_ltdb_header *hdr;
1204
1205         /*
1206          * skip empty records - but NOT for persistent databases:
1207          *
1208          * The record-by-record mode of recovery deletes empty records.
1209          * For persistent databases, this can lead to data corruption
1210          * by deleting records that should be there:
1211          *
1212          * - Assume the cluster has been running for a while.
1213          *
1214          * - A record R in a persistent database has been created and
1215          *   deleted a couple of times, the last operation being deletion,
1216          *   leaving an empty record with a high RSN, say 10.
1217          *
1218          * - Now a node N is turned off.
1219          *
1220          * - This leaves the local database copy of D on N with the empty
1221          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1222          *   the copy of record R.
1223          *
1224          * - Now the record is created again while node N is turned off.
1225          *   This creates R with RSN = 1 on all nodes except for N.
1226          *
1227          * - Now node N is turned on again. The following recovery will chose
1228          *   the older empty copy of R due to RSN 10 > RSN 1.
1229          *
1230          * ==> Hence the record is gone after the recovery.
1231          *
1232          * On databases like Samba's registry, this can damage the higher-level
1233          * data structures built from the various tdb-level records.
1234          */
1235         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1236                 return 0;
1237         }
1238
1239         /* update the dmaster field to point to us */
1240         hdr = (struct ctdb_ltdb_header *)data.dptr;
1241         if (!params->persistent) {
1242                 hdr->dmaster = params->ctdb->pnn;
1243                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1244         }
1245
1246         /* add the record to the blob ready to send to the nodes */
1247         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1248         if (rec == NULL) {
1249                 params->failed = true;
1250                 return -1;
1251         }
1252         if (params->len + rec->length >= params->allocated_len) {
1253                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1254                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1255         }
1256         if (params->recdata == NULL) {
1257                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1258                          rec->length + params->len, params->recdata->count));
1259                 params->failed = true;
1260                 return -1;
1261         }
1262         params->recdata->count++;
1263         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1264         params->len += rec->length;
1265         talloc_free(rec);
1266
1267         return 0;
1268 }
1269
1270 /*
1271   push the recdb database out to all nodes
1272  */
1273 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1274                                bool persistent,
1275                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1276 {
1277         struct recdb_data params;
1278         struct ctdb_marshall_buffer *recdata;
1279         TDB_DATA outdata;
1280         TALLOC_CTX *tmp_ctx;
1281         uint32_t *nodes;
1282
1283         tmp_ctx = talloc_new(ctdb);
1284         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1285
1286         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1287         CTDB_NO_MEMORY(ctdb, recdata);
1288
1289         recdata->db_id = dbid;
1290
1291         params.ctdb = ctdb;
1292         params.recdata = recdata;
1293         params.len = offsetof(struct ctdb_marshall_buffer, data);
1294         params.allocated_len = params.len;
1295         params.failed = false;
1296         params.persistent = persistent;
1297
1298         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1299                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1300                 talloc_free(params.recdata);
1301                 talloc_free(tmp_ctx);
1302                 return -1;
1303         }
1304
1305         if (params.failed) {
1306                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1307                 talloc_free(params.recdata);
1308                 talloc_free(tmp_ctx);
1309                 return -1;              
1310         }
1311
1312         recdata = params.recdata;
1313
1314         outdata.dptr = (void *)recdata;
1315         outdata.dsize = params.len;
1316
1317         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1318         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1319                                         nodes, 0,
1320                                         CONTROL_TIMEOUT(), false, outdata,
1321                                         NULL, NULL,
1322                                         NULL) != 0) {
1323                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1324                 talloc_free(recdata);
1325                 talloc_free(tmp_ctx);
1326                 return -1;
1327         }
1328
1329         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1330                   dbid, recdata->count));
1331
1332         talloc_free(recdata);
1333         talloc_free(tmp_ctx);
1334
1335         return 0;
1336 }
1337
1338
1339 /*
1340   go through a full recovery on one database 
1341  */
1342 static int recover_database(struct ctdb_recoverd *rec, 
1343                             TALLOC_CTX *mem_ctx,
1344                             uint32_t dbid,
1345                             bool persistent,
1346                             uint32_t pnn, 
1347                             struct ctdb_node_map *nodemap,
1348                             uint32_t transaction_id)
1349 {
1350         struct tdb_wrap *recdb;
1351         int ret;
1352         struct ctdb_context *ctdb = rec->ctdb;
1353         TDB_DATA data;
1354         struct ctdb_control_wipe_database w;
1355         uint32_t *nodes;
1356
1357         recdb = create_recdb(ctdb, mem_ctx);
1358         if (recdb == NULL) {
1359                 return -1;
1360         }
1361
1362         /* pull all remote databases onto the recdb */
1363         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1364         if (ret != 0) {
1365                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1366                 return -1;
1367         }
1368
1369         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1370
1371         /* wipe all the remote databases. This is safe as we are in a transaction */
1372         w.db_id = dbid;
1373         w.transaction_id = transaction_id;
1374
1375         data.dptr = (void *)&w;
1376         data.dsize = sizeof(w);
1377
1378         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1379         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1380                                         nodes, 0,
1381                                         CONTROL_TIMEOUT(), false, data,
1382                                         NULL, NULL,
1383                                         NULL) != 0) {
1384                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1385                 talloc_free(recdb);
1386                 return -1;
1387         }
1388         
1389         /* push out the correct database. This sets the dmaster and skips 
1390            the empty records */
1391         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1392         if (ret != 0) {
1393                 talloc_free(recdb);
1394                 return -1;
1395         }
1396
1397         /* all done with this database */
1398         talloc_free(recdb);
1399
1400         return 0;
1401 }
1402
1403 /*
1404   reload the nodes file 
1405 */
1406 static void reload_nodes_file(struct ctdb_context *ctdb)
1407 {
1408         ctdb->nodes = NULL;
1409         ctdb_load_nodes_file(ctdb);
1410 }
1411
1412 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1413                                          struct ctdb_recoverd *rec,
1414                                          struct ctdb_node_map *nodemap,
1415                                          uint32_t *culprit)
1416 {
1417         int j;
1418         int ret;
1419
1420         if (ctdb->num_nodes != nodemap->num) {
1421                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1422                                   ctdb->num_nodes, nodemap->num));
1423                 if (culprit) {
1424                         *culprit = ctdb->pnn;
1425                 }
1426                 return -1;
1427         }
1428
1429         for (j=0; j<nodemap->num; j++) {
1430                 /* release any existing data */
1431                 if (ctdb->nodes[j]->known_public_ips) {
1432                         talloc_free(ctdb->nodes[j]->known_public_ips);
1433                         ctdb->nodes[j]->known_public_ips = NULL;
1434                 }
1435                 if (ctdb->nodes[j]->available_public_ips) {
1436                         talloc_free(ctdb->nodes[j]->available_public_ips);
1437                         ctdb->nodes[j]->available_public_ips = NULL;
1438                 }
1439
1440                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1441                         continue;
1442                 }
1443
1444                 /* grab a new shiny list of public ips from the node */
1445                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1446                                         CONTROL_TIMEOUT(),
1447                                         ctdb->nodes[j]->pnn,
1448                                         ctdb->nodes,
1449                                         0,
1450                                         &ctdb->nodes[j]->known_public_ips);
1451                 if (ret != 0) {
1452                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1453                                 ctdb->nodes[j]->pnn));
1454                         if (culprit) {
1455                                 *culprit = ctdb->nodes[j]->pnn;
1456                         }
1457                         return -1;
1458                 }
1459
1460                 if (ctdb->do_checkpublicip) {
1461                         if (rec->ip_check_disable_ctx == NULL) {
1462                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1463                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1464                                         rec->need_takeover_run = true;
1465                                 }
1466                         }
1467                 }
1468
1469                 /* grab a new shiny list of public ips from the node */
1470                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1471                                         CONTROL_TIMEOUT(),
1472                                         ctdb->nodes[j]->pnn,
1473                                         ctdb->nodes,
1474                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1475                                         &ctdb->nodes[j]->available_public_ips);
1476                 if (ret != 0) {
1477                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1478                                 ctdb->nodes[j]->pnn));
1479                         if (culprit) {
1480                                 *culprit = ctdb->nodes[j]->pnn;
1481                         }
1482                         return -1;
1483                 }
1484         }
1485
1486         return 0;
1487 }
1488
1489 /* when we start a recovery, make sure all nodes use the same reclock file
1490    setting
1491 */
1492 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1493 {
1494         struct ctdb_context *ctdb = rec->ctdb;
1495         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1496         TDB_DATA data;
1497         uint32_t *nodes;
1498
1499         if (ctdb->recovery_lock_file == NULL) {
1500                 data.dptr  = NULL;
1501                 data.dsize = 0;
1502         } else {
1503                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1504                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1505         }
1506
1507         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1508         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1509                                         nodes, 0,
1510                                         CONTROL_TIMEOUT(),
1511                                         false, data,
1512                                         NULL, NULL,
1513                                         rec) != 0) {
1514                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1515                 talloc_free(tmp_ctx);
1516                 return -1;
1517         }
1518
1519         talloc_free(tmp_ctx);
1520         return 0;
1521 }
1522
1523
1524 /*
1525  * this callback is called for every node that failed to execute ctdb_takeover_run()
1526  * and set flag to re-run takeover run.
1527  */
1528 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1529 {
1530         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1531
1532         if (callback_data != NULL) {
1533                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1534
1535                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1536
1537                 ctdb_set_culprit(rec, node_pnn);
1538                 rec->need_takeover_run = true;
1539         }
1540 }
1541
1542
1543 /*
1544   we are the recmaster, and recovery is needed - start a recovery run
1545  */
1546 static int do_recovery(struct ctdb_recoverd *rec, 
1547                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1548                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1549 {
1550         struct ctdb_context *ctdb = rec->ctdb;
1551         int i, j, ret;
1552         uint32_t generation;
1553         struct ctdb_dbid_map *dbmap;
1554         TDB_DATA data;
1555         uint32_t *nodes;
1556         struct timeval start_time;
1557         uint32_t culprit = (uint32_t)-1;
1558
1559         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1560
1561         /* if recovery fails, force it again */
1562         rec->need_recovery = true;
1563
1564         for (i=0; i<ctdb->num_nodes; i++) {
1565                 struct ctdb_banning_state *ban_state;
1566
1567                 if (ctdb->nodes[i]->ban_state == NULL) {
1568                         continue;
1569                 }
1570                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1571                 if (ban_state->count < 2*ctdb->num_nodes) {
1572                         continue;
1573                 }
1574                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1575                         ctdb->nodes[i]->pnn, ban_state->count,
1576                         ctdb->tunable.recovery_ban_period));
1577                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1578                 ban_state->count = 0;
1579         }
1580
1581
1582         if (ctdb->tunable.verify_recovery_lock != 0) {
1583                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1584                 start_time = timeval_current();
1585                 if (!ctdb_recovery_lock(ctdb, true)) {
1586                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1587                                          "and ban ourself for %u seconds\n",
1588                                          ctdb->tunable.recovery_ban_period));
1589                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1590                         return -1;
1591                 }
1592                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1593                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1594         }
1595
1596         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1597
1598         /* get a list of all databases */
1599         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1600         if (ret != 0) {
1601                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1602                 return -1;
1603         }
1604
1605         /* we do the db creation before we set the recovery mode, so the freeze happens
1606            on all databases we will be dealing with. */
1607
1608         /* verify that we have all the databases any other node has */
1609         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1610         if (ret != 0) {
1611                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1612                 return -1;
1613         }
1614
1615         /* verify that all other nodes have all our databases */
1616         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1617         if (ret != 0) {
1618                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1619                 return -1;
1620         }
1621         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1622
1623         /* update the database priority for all remote databases */
1624         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1625         if (ret != 0) {
1626                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1627         }
1628         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1629
1630
1631         /* update all other nodes to use the same setting for reclock files
1632            as the local recovery master.
1633         */
1634         sync_recovery_lock_file_across_cluster(rec);
1635
1636         /* set recovery mode to active on all nodes */
1637         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1638         if (ret != 0) {
1639                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1640                 return -1;
1641         }
1642
1643         /* execute the "startrecovery" event script on all nodes */
1644         ret = run_startrecovery_eventscript(rec, nodemap);
1645         if (ret!=0) {
1646                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1647                 return -1;
1648         }
1649
1650         /*
1651           update all nodes to have the same flags that we have
1652          */
1653         for (i=0;i<nodemap->num;i++) {
1654                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1655                         continue;
1656                 }
1657
1658                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1659                 if (ret != 0) {
1660                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1661                         return -1;
1662                 }
1663         }
1664
1665         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1666
1667         /* pick a new generation number */
1668         generation = new_generation();
1669
1670         /* change the vnnmap on this node to use the new generation 
1671            number but not on any other nodes.
1672            this guarantees that if we abort the recovery prematurely
1673            for some reason (a node stops responding?)
1674            that we can just return immediately and we will reenter
1675            recovery shortly again.
1676            I.e. we deliberately leave the cluster with an inconsistent
1677            generation id to allow us to abort recovery at any stage and
1678            just restart it from scratch.
1679          */
1680         vnnmap->generation = generation;
1681         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1682         if (ret != 0) {
1683                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1684                 return -1;
1685         }
1686
1687         data.dptr = (void *)&generation;
1688         data.dsize = sizeof(uint32_t);
1689
1690         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1691         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1692                                         nodes, 0,
1693                                         CONTROL_TIMEOUT(), false, data,
1694                                         NULL,
1695                                         transaction_start_fail_callback,
1696                                         rec) != 0) {
1697                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1698                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1699                                         nodes, 0,
1700                                         CONTROL_TIMEOUT(), false, tdb_null,
1701                                         NULL,
1702                                         NULL,
1703                                         NULL) != 0) {
1704                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1705                 }
1706                 return -1;
1707         }
1708
1709         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1710
1711         for (i=0;i<dbmap->num;i++) {
1712                 ret = recover_database(rec, mem_ctx,
1713                                        dbmap->dbs[i].dbid,
1714                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1715                                        pnn, nodemap, generation);
1716                 if (ret != 0) {
1717                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1718                         return -1;
1719                 }
1720         }
1721
1722         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1723
1724         /* commit all the changes */
1725         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1726                                         nodes, 0,
1727                                         CONTROL_TIMEOUT(), false, data,
1728                                         NULL, NULL,
1729                                         NULL) != 0) {
1730                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1731                 return -1;
1732         }
1733
1734         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1735         
1736
1737         /* update the capabilities for all nodes */
1738         ret = update_capabilities(ctdb, nodemap);
1739         if (ret!=0) {
1740                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1741                 return -1;
1742         }
1743
1744         /* build a new vnn map with all the currently active and
1745            unbanned nodes */
1746         generation = new_generation();
1747         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1748         CTDB_NO_MEMORY(ctdb, vnnmap);
1749         vnnmap->generation = generation;
1750         vnnmap->size = 0;
1751         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1752         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1753         for (i=j=0;i<nodemap->num;i++) {
1754                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1755                         continue;
1756                 }
1757                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1758                         /* this node can not be an lmaster */
1759                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1760                         continue;
1761                 }
1762
1763                 vnnmap->size++;
1764                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1765                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1766                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1767
1768         }
1769         if (vnnmap->size == 0) {
1770                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1771                 vnnmap->size++;
1772                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1773                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1774                 vnnmap->map[0] = pnn;
1775         }       
1776
1777         /* update to the new vnnmap on all nodes */
1778         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1779         if (ret != 0) {
1780                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1781                 return -1;
1782         }
1783
1784         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1785
1786         /* update recmaster to point to us for all nodes */
1787         ret = set_recovery_master(ctdb, nodemap, pnn);
1788         if (ret!=0) {
1789                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1790                 return -1;
1791         }
1792
1793         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1794
1795         /*
1796           update all nodes to have the same flags that we have
1797          */
1798         for (i=0;i<nodemap->num;i++) {
1799                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1800                         continue;
1801                 }
1802
1803                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1804                 if (ret != 0) {
1805                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1806                         return -1;
1807                 }
1808         }
1809
1810         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1811
1812         /* disable recovery mode */
1813         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1814         if (ret != 0) {
1815                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1816                 return -1;
1817         }
1818
1819         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1820
1821         /*
1822           tell nodes to takeover their public IPs
1823          */
1824         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1825         if (ret != 0) {
1826                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1827                                  culprit));
1828                 rec->need_takeover_run = true;
1829                 return -1;
1830         }
1831         rec->need_takeover_run = false;
1832         ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, NULL);
1833         if (ret != 0) {
1834                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1835                 rec->need_takeover_run = true;
1836         }
1837
1838         /* execute the "recovered" event script on all nodes */
1839         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1840         if (ret!=0) {
1841                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1842                 return -1;
1843         }
1844
1845         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1846
1847         /* send a message to all clients telling them that the cluster 
1848            has been reconfigured */
1849         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1850
1851         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1852
1853         rec->need_recovery = false;
1854
1855         /* we managed to complete a full recovery, make sure to forgive
1856            any past sins by the nodes that could now participate in the
1857            recovery.
1858         */
1859         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1860         for (i=0;i<nodemap->num;i++) {
1861                 struct ctdb_banning_state *ban_state;
1862
1863                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1864                         continue;
1865                 }
1866
1867                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1868                 if (ban_state == NULL) {
1869                         continue;
1870                 }
1871
1872                 ban_state->count = 0;
1873         }
1874
1875
1876         /* We just finished a recovery successfully. 
1877            We now wait for rerecovery_timeout before we allow 
1878            another recovery to take place.
1879         */
1880         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1881         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1882         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1883
1884         return 0;
1885 }
1886
1887
1888 /*
1889   elections are won by first checking the number of connected nodes, then
1890   the priority time, then the pnn
1891  */
1892 struct election_message {
1893         uint32_t num_connected;
1894         struct timeval priority_time;
1895         uint32_t pnn;
1896         uint32_t node_flags;
1897 };
1898
1899 /*
1900   form this nodes election data
1901  */
1902 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1903 {
1904         int ret, i;
1905         struct ctdb_node_map *nodemap;
1906         struct ctdb_context *ctdb = rec->ctdb;
1907
1908         ZERO_STRUCTP(em);
1909
1910         em->pnn = rec->ctdb->pnn;
1911         em->priority_time = rec->priority_time;
1912
1913         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1914         if (ret != 0) {
1915                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1916                 return;
1917         }
1918
1919         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1920         em->node_flags = rec->node_flags;
1921
1922         for (i=0;i<nodemap->num;i++) {
1923                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1924                         em->num_connected++;
1925                 }
1926         }
1927
1928         /* we shouldnt try to win this election if we cant be a recmaster */
1929         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1930                 em->num_connected = 0;
1931                 em->priority_time = timeval_current();
1932         }
1933
1934         talloc_free(nodemap);
1935 }
1936
1937 /*
1938   see if the given election data wins
1939  */
1940 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1941 {
1942         struct election_message myem;
1943         int cmp = 0;
1944
1945         ctdb_election_data(rec, &myem);
1946
1947         /* we cant win if we dont have the recmaster capability */
1948         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1949                 return false;
1950         }
1951
1952         /* we cant win if we are banned */
1953         if (rec->node_flags & NODE_FLAGS_BANNED) {
1954                 return false;
1955         }       
1956
1957         /* we cant win if we are stopped */
1958         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1959                 return false;
1960         }       
1961
1962         /* we will automatically win if the other node is banned */
1963         if (em->node_flags & NODE_FLAGS_BANNED) {
1964                 return true;
1965         }
1966
1967         /* we will automatically win if the other node is banned */
1968         if (em->node_flags & NODE_FLAGS_STOPPED) {
1969                 return true;
1970         }
1971
1972         /* try to use the most connected node */
1973         if (cmp == 0) {
1974                 cmp = (int)myem.num_connected - (int)em->num_connected;
1975         }
1976
1977         /* then the longest running node */
1978         if (cmp == 0) {
1979                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1980         }
1981
1982         if (cmp == 0) {
1983                 cmp = (int)myem.pnn - (int)em->pnn;
1984         }
1985
1986         return cmp > 0;
1987 }
1988
1989 /*
1990   send out an election request
1991  */
1992 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1993 {
1994         int ret;
1995         TDB_DATA election_data;
1996         struct election_message emsg;
1997         uint64_t srvid;
1998         struct ctdb_context *ctdb = rec->ctdb;
1999
2000         srvid = CTDB_SRVID_RECOVERY;
2001
2002         ctdb_election_data(rec, &emsg);
2003
2004         election_data.dsize = sizeof(struct election_message);
2005         election_data.dptr  = (unsigned char *)&emsg;
2006
2007
2008         /* send an election message to all active nodes */
2009         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2010         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2011
2012
2013         /* A new node that is already frozen has entered the cluster.
2014            The existing nodes are not frozen and dont need to be frozen
2015            until the election has ended and we start the actual recovery
2016         */
2017         if (update_recmaster == true) {
2018                 /* first we assume we will win the election and set 
2019                    recoverymaster to be ourself on the current node
2020                  */
2021                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2022                 if (ret != 0) {
2023                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2024                         return -1;
2025                 }
2026         }
2027
2028
2029         return 0;
2030 }
2031
2032 /*
2033   this function will unban all nodes in the cluster
2034 */
2035 static void unban_all_nodes(struct ctdb_context *ctdb)
2036 {
2037         int ret, i;
2038         struct ctdb_node_map *nodemap;
2039         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2040         
2041         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2042         if (ret != 0) {
2043                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2044                 return;
2045         }
2046
2047         for (i=0;i<nodemap->num;i++) {
2048                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2049                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2050                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2051                 }
2052         }
2053
2054         talloc_free(tmp_ctx);
2055 }
2056
2057
2058 /*
2059   we think we are winning the election - send a broadcast election request
2060  */
2061 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2062 {
2063         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2064         int ret;
2065
2066         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2069         }
2070
2071         talloc_free(rec->send_election_te);
2072         rec->send_election_te = NULL;
2073 }
2074
2075 /*
2076   handler for memory dumps
2077 */
2078 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2079                              TDB_DATA data, void *private_data)
2080 {
2081         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2082         TDB_DATA *dump;
2083         int ret;
2084         struct rd_memdump_reply *rd;
2085
2086         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2087                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2088                 talloc_free(tmp_ctx);
2089                 return;
2090         }
2091         rd = (struct rd_memdump_reply *)data.dptr;
2092
2093         dump = talloc_zero(tmp_ctx, TDB_DATA);
2094         if (dump == NULL) {
2095                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2096                 talloc_free(tmp_ctx);
2097                 return;
2098         }
2099         ret = ctdb_dump_memory(ctdb, dump);
2100         if (ret != 0) {
2101                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2102                 talloc_free(tmp_ctx);
2103                 return;
2104         }
2105
2106 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2107
2108         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2109         if (ret != 0) {
2110                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2111                 talloc_free(tmp_ctx);
2112                 return;
2113         }
2114
2115         talloc_free(tmp_ctx);
2116 }
2117
2118 /*
2119   handler for getlog
2120 */
2121 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2122                            TDB_DATA data, void *private_data)
2123 {
2124         struct ctdb_get_log_addr *log_addr;
2125         pid_t child;
2126
2127         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2128                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2129                 return;
2130         }
2131         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2132
2133         child = ctdb_fork_no_free_ringbuffer(ctdb);
2134         if (child == (pid_t)-1) {
2135                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2136                 return;
2137         }
2138
2139         if (child == 0) {
2140                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2141                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2142                         _exit(1);
2143                 }
2144                 ctdb_collect_log(ctdb, log_addr);
2145                 _exit(0);
2146         }
2147 }
2148
2149 /*
2150   handler for clearlog
2151 */
2152 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2153                              TDB_DATA data, void *private_data)
2154 {
2155         ctdb_clear_log(ctdb);
2156 }
2157
2158 /*
2159   handler for reload_nodes
2160 */
2161 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2162                              TDB_DATA data, void *private_data)
2163 {
2164         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2165
2166         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2167
2168         reload_nodes_file(rec->ctdb);
2169 }
2170
2171
2172 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2173                               struct timeval yt, void *p)
2174 {
2175         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2176
2177         talloc_free(rec->ip_check_disable_ctx);
2178         rec->ip_check_disable_ctx = NULL;
2179 }
2180
2181
2182 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2183                                   struct timeval t, void *p)
2184 {
2185         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2186         struct ctdb_context *ctdb = rec->ctdb;
2187         int ret;
2188
2189         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2190
2191         ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2192         if (ret != 0) {
2193                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2194                 rec->need_takeover_run = true;
2195         }
2196
2197         talloc_free(rec->deferred_rebalance_ctx);
2198         rec->deferred_rebalance_ctx = NULL;
2199 }
2200
2201         
2202 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2203                              TDB_DATA data, void *private_data)
2204 {
2205         uint32_t pnn;
2206         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2207
2208         if (data.dsize != sizeof(uint32_t)) {
2209                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2210                 return;
2211         }
2212
2213         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2214                 return;
2215         }
2216
2217         pnn = *(uint32_t *)&data.dptr[0];
2218
2219         lcp2_forcerebalance(ctdb, pnn);
2220         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2221
2222         if (rec->deferred_rebalance_ctx != NULL) {
2223                 talloc_free(rec->deferred_rebalance_ctx);
2224         }
2225         rec->deferred_rebalance_ctx = talloc_new(rec);
2226         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2227                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2228                         ctdb_rebalance_timeout, rec);
2229 }
2230
2231
2232
2233 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2234                              TDB_DATA data, void *private_data)
2235 {
2236         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2237         struct ctdb_public_ip *ip;
2238
2239         if (rec->recmaster != rec->ctdb->pnn) {
2240                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2241                 return;
2242         }
2243
2244         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2245                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2246                 return;
2247         }
2248
2249         ip = (struct ctdb_public_ip *)data.dptr;
2250
2251         update_ip_assignment_tree(rec->ctdb, ip);
2252 }
2253
2254
2255 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2256                              TDB_DATA data, void *private_data)
2257 {
2258         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2259         uint32_t timeout;
2260
2261         if (rec->ip_check_disable_ctx != NULL) {
2262                 talloc_free(rec->ip_check_disable_ctx);
2263                 rec->ip_check_disable_ctx = NULL;
2264         }
2265
2266         if (data.dsize != sizeof(uint32_t)) {
2267                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2268                                  "expexting %lu\n", (long unsigned)data.dsize,
2269                                  (long unsigned)sizeof(uint32_t)));
2270                 return;
2271         }
2272         if (data.dptr == NULL) {
2273                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2274                 return;
2275         }
2276
2277         timeout = *((uint32_t *)data.dptr);
2278
2279         if (timeout == 0) {
2280                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2281                 return;
2282         }
2283                 
2284         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2285
2286         rec->ip_check_disable_ctx = talloc_new(rec);
2287         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2288
2289         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2290 }
2291
2292
2293 /*
2294   handler for reload all ips.
2295 */
2296 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2297                              TDB_DATA data, void *private_data)
2298 {
2299         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2300
2301         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2302                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2303                 return;
2304         }
2305
2306         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2307
2308         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2309         return;
2310 }
2311
2312 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2313 {
2314         uint32_t *status = callback_data;
2315
2316         if (res != 0) {
2317                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2318                 *status = 1;
2319         }
2320 }
2321
2322 static int
2323 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2324 {
2325         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2326         uint32_t *nodes;
2327         uint32_t status;
2328         int i;
2329
2330         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2331         for (i = 0; i< nodemap->num; i++) {
2332                 if (nodemap->nodes[i].flags != 0) {
2333                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2334                         talloc_free(tmp_ctx);
2335                         return -1;
2336                 }
2337         }
2338
2339         /* send the flags update to all connected nodes */
2340         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2341         status = 0;
2342         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2343                                         nodes, 0,
2344                                         CONTROL_TIMEOUT(),
2345                                         false, tdb_null,
2346                                         async_reloadips_callback, NULL,
2347                                         &status) != 0) {
2348                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2349                 talloc_free(tmp_ctx);
2350                 return -1;
2351         }
2352
2353         if (status != 0) {
2354                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2355                 talloc_free(tmp_ctx);
2356                 return -1;
2357         }
2358
2359         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2360
2361         talloc_free(tmp_ctx);
2362         return 0;
2363 }
2364
2365
2366 /*
2367   handler for ip reallocate, just add it to the list of callers and 
2368   handle this later in the monitor_cluster loop so we do not recurse
2369   with other callers to takeover_run()
2370 */
2371 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2372                              TDB_DATA data, void *private_data)
2373 {
2374         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2375         struct ip_reallocate_list *caller;
2376
2377         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2378                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2379                 return;
2380         }
2381
2382         if (rec->ip_reallocate_ctx == NULL) {
2383                 rec->ip_reallocate_ctx = talloc_new(rec);
2384                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2385         }
2386
2387         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2388         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2389
2390         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2391         caller->next = rec->reallocate_callers;
2392         rec->reallocate_callers = caller;
2393
2394         return;
2395 }
2396
2397 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2398 {
2399         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2400         TDB_DATA result;
2401         int32_t ret;
2402         struct ip_reallocate_list *callers;
2403         uint32_t culprit;
2404
2405         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2406
2407         /* update the list of public ips that a node can handle for
2408            all connected nodes
2409         */
2410         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2411         if (ret != 0) {
2412                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2413                                  culprit));
2414                 rec->need_takeover_run = true;
2415         }
2416         if (ret == 0) {
2417                 ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2418                 if (ret != 0) {
2419                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2420                         rec->need_takeover_run = true;
2421                 }
2422         }
2423
2424         result.dsize = sizeof(int32_t);
2425         result.dptr  = (uint8_t *)&ret;
2426
2427         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2428
2429                 /* Someone that sent srvid==0 does not want a reply */
2430                 if (callers->rd->srvid == 0) {
2431                         continue;
2432                 }
2433                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2434                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2435                                   (unsigned long long)callers->rd->srvid));
2436                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2437                 if (ret != 0) {
2438                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2439                                          "message to %u:%llu\n",
2440                                          (unsigned)callers->rd->pnn,
2441                                          (unsigned long long)callers->rd->srvid));
2442                 }
2443         }
2444
2445         talloc_free(tmp_ctx);
2446         talloc_free(rec->ip_reallocate_ctx);
2447         rec->ip_reallocate_ctx = NULL;
2448         rec->reallocate_callers = NULL;
2449         
2450 }
2451
2452
2453 /*
2454   handler for recovery master elections
2455 */
2456 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2457                              TDB_DATA data, void *private_data)
2458 {
2459         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2460         int ret;
2461         struct election_message *em = (struct election_message *)data.dptr;
2462         TALLOC_CTX *mem_ctx;
2463
2464         /* we got an election packet - update the timeout for the election */
2465         talloc_free(rec->election_timeout);
2466         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2467                                                 fast_start ?
2468                                                 timeval_current_ofs(0, 500000) :
2469                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2470                                                 ctdb_election_timeout, rec);
2471
2472         mem_ctx = talloc_new(ctdb);
2473
2474         /* someone called an election. check their election data
2475            and if we disagree and we would rather be the elected node, 
2476            send a new election message to all other nodes
2477          */
2478         if (ctdb_election_win(rec, em)) {
2479                 if (!rec->send_election_te) {
2480                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2481                                                                 timeval_current_ofs(0, 500000),
2482                                                                 election_send_request, rec);
2483                 }
2484                 talloc_free(mem_ctx);
2485                 /*unban_all_nodes(ctdb);*/
2486                 return;
2487         }
2488         
2489         /* we didn't win */
2490         talloc_free(rec->send_election_te);
2491         rec->send_election_te = NULL;
2492
2493         if (ctdb->tunable.verify_recovery_lock != 0) {
2494                 /* release the recmaster lock */
2495                 if (em->pnn != ctdb->pnn &&
2496                     ctdb->recovery_lock_fd != -1) {
2497                         close(ctdb->recovery_lock_fd);
2498                         ctdb->recovery_lock_fd = -1;
2499                         unban_all_nodes(ctdb);
2500                 }
2501         }
2502
2503         /* ok, let that guy become recmaster then */
2504         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2505         if (ret != 0) {
2506                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2507                 talloc_free(mem_ctx);
2508                 return;
2509         }
2510
2511         talloc_free(mem_ctx);
2512         return;
2513 }
2514
2515
2516 /*
2517   force the start of the election process
2518  */
2519 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2520                            struct ctdb_node_map *nodemap)
2521 {
2522         int ret;
2523         struct ctdb_context *ctdb = rec->ctdb;
2524
2525         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2526
2527         /* set all nodes to recovery mode to stop all internode traffic */
2528         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2529         if (ret != 0) {
2530                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2531                 return;
2532         }
2533
2534         talloc_free(rec->election_timeout);
2535         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2536                                                 fast_start ?
2537                                                 timeval_current_ofs(0, 500000) :
2538                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2539                                                 ctdb_election_timeout, rec);
2540
2541         ret = send_election_request(rec, pnn, true);
2542         if (ret!=0) {
2543                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2544                 return;
2545         }
2546
2547         /* wait for a few seconds to collect all responses */
2548         ctdb_wait_election(rec);
2549 }
2550
2551
2552
2553 /*
2554   handler for when a node changes its flags
2555 */
2556 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2557                             TDB_DATA data, void *private_data)
2558 {
2559         int ret;
2560         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2561         struct ctdb_node_map *nodemap=NULL;
2562         TALLOC_CTX *tmp_ctx;
2563         int i;
2564         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2565         int disabled_flag_changed;
2566
2567         if (data.dsize != sizeof(*c)) {
2568                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2569                 return;
2570         }
2571
2572         tmp_ctx = talloc_new(ctdb);
2573         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2574
2575         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2576         if (ret != 0) {
2577                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2578                 talloc_free(tmp_ctx);
2579                 return;         
2580         }
2581
2582
2583         for (i=0;i<nodemap->num;i++) {
2584                 if (nodemap->nodes[i].pnn == c->pnn) break;
2585         }
2586
2587         if (i == nodemap->num) {
2588                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2589                 talloc_free(tmp_ctx);
2590                 return;
2591         }
2592
2593         if (nodemap->nodes[i].flags != c->new_flags) {
2594                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2595         }
2596
2597         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2598
2599         nodemap->nodes[i].flags = c->new_flags;
2600
2601         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2602                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2603
2604         if (ret == 0) {
2605                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2606                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2607         }
2608         
2609         if (ret == 0 &&
2610             ctdb->recovery_master == ctdb->pnn &&
2611             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2612                 /* Only do the takeover run if the perm disabled or unhealthy
2613                    flags changed since these will cause an ip failover but not
2614                    a recovery.
2615                    If the node became disconnected or banned this will also
2616                    lead to an ip address failover but that is handled 
2617                    during recovery
2618                 */
2619                 if (disabled_flag_changed) {
2620                         rec->need_takeover_run = true;
2621                 }
2622         }
2623
2624         talloc_free(tmp_ctx);
2625 }
2626
2627 /*
2628   handler for when we need to push out flag changes ot all other nodes
2629 */
2630 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2631                             TDB_DATA data, void *private_data)
2632 {
2633         int ret;
2634         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2635         struct ctdb_node_map *nodemap=NULL;
2636         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2637         uint32_t recmaster;
2638         uint32_t *nodes;
2639
2640         /* find the recovery master */
2641         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2642         if (ret != 0) {
2643                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2644                 talloc_free(tmp_ctx);
2645                 return;
2646         }
2647
2648         /* read the node flags from the recmaster */
2649         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2650         if (ret != 0) {
2651                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2652                 talloc_free(tmp_ctx);
2653                 return;
2654         }
2655         if (c->pnn >= nodemap->num) {
2656                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2657                 talloc_free(tmp_ctx);
2658                 return;
2659         }
2660
2661         /* send the flags update to all connected nodes */
2662         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2663
2664         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2665                                       nodes, 0, CONTROL_TIMEOUT(),
2666                                       false, data,
2667                                       NULL, NULL,
2668                                       NULL) != 0) {
2669                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2670
2671                 talloc_free(tmp_ctx);
2672                 return;
2673         }
2674
2675         talloc_free(tmp_ctx);
2676 }
2677
2678
2679 struct verify_recmode_normal_data {
2680         uint32_t count;
2681         enum monitor_result status;
2682 };
2683
2684 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2685 {
2686         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2687
2688
2689         /* one more node has responded with recmode data*/
2690         rmdata->count--;
2691
2692         /* if we failed to get the recmode, then return an error and let
2693            the main loop try again.
2694         */
2695         if (state->state != CTDB_CONTROL_DONE) {
2696                 if (rmdata->status == MONITOR_OK) {
2697                         rmdata->status = MONITOR_FAILED;
2698                 }
2699                 return;
2700         }
2701
2702         /* if we got a response, then the recmode will be stored in the
2703            status field
2704         */
2705         if (state->status != CTDB_RECOVERY_NORMAL) {
2706                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2707                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2708         }
2709
2710         return;
2711 }
2712
2713
2714 /* verify that all nodes are in normal recovery mode */
2715 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2716 {
2717         struct verify_recmode_normal_data *rmdata;
2718         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2719         struct ctdb_client_control_state *state;
2720         enum monitor_result status;
2721         int j;
2722         
2723         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2724         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2725         rmdata->count  = 0;
2726         rmdata->status = MONITOR_OK;
2727
2728         /* loop over all active nodes and send an async getrecmode call to 
2729            them*/
2730         for (j=0; j<nodemap->num; j++) {
2731                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2732                         continue;
2733                 }
2734                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2735                                         CONTROL_TIMEOUT(), 
2736                                         nodemap->nodes[j].pnn);
2737                 if (state == NULL) {
2738                         /* we failed to send the control, treat this as 
2739                            an error and try again next iteration
2740                         */                      
2741                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2742                         talloc_free(mem_ctx);
2743                         return MONITOR_FAILED;
2744                 }
2745
2746                 /* set up the callback functions */
2747                 state->async.fn = verify_recmode_normal_callback;
2748                 state->async.private_data = rmdata;
2749
2750                 /* one more control to wait for to complete */
2751                 rmdata->count++;
2752         }
2753
2754
2755         /* now wait for up to the maximum number of seconds allowed
2756            or until all nodes we expect a response from has replied
2757         */
2758         while (rmdata->count > 0) {
2759                 event_loop_once(ctdb->ev);
2760         }
2761
2762         status = rmdata->status;
2763         talloc_free(mem_ctx);
2764         return status;
2765 }
2766
2767
2768 struct verify_recmaster_data {
2769         struct ctdb_recoverd *rec;
2770         uint32_t count;
2771         uint32_t pnn;
2772         enum monitor_result status;
2773 };
2774
2775 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2776 {
2777         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2778
2779
2780         /* one more node has responded with recmaster data*/
2781         rmdata->count--;
2782
2783         /* if we failed to get the recmaster, then return an error and let
2784            the main loop try again.
2785         */
2786         if (state->state != CTDB_CONTROL_DONE) {
2787                 if (rmdata->status == MONITOR_OK) {
2788                         rmdata->status = MONITOR_FAILED;
2789                 }
2790                 return;
2791         }
2792
2793         /* if we got a response, then the recmaster will be stored in the
2794            status field
2795         */
2796         if (state->status != rmdata->pnn) {
2797                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2798                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2799                 rmdata->status = MONITOR_ELECTION_NEEDED;
2800         }
2801
2802         return;
2803 }
2804
2805
2806 /* verify that all nodes agree that we are the recmaster */
2807 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2808 {
2809         struct ctdb_context *ctdb = rec->ctdb;
2810         struct verify_recmaster_data *rmdata;
2811         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2812         struct ctdb_client_control_state *state;
2813         enum monitor_result status;
2814         int j;
2815         
2816         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2817         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2818         rmdata->rec    = rec;
2819         rmdata->count  = 0;
2820         rmdata->pnn    = pnn;
2821         rmdata->status = MONITOR_OK;
2822
2823         /* loop over all active nodes and send an async getrecmaster call to 
2824            them*/
2825         for (j=0; j<nodemap->num; j++) {
2826                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2827                         continue;
2828                 }
2829                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2830                                         CONTROL_TIMEOUT(),
2831                                         nodemap->nodes[j].pnn);
2832                 if (state == NULL) {
2833                         /* we failed to send the control, treat this as 
2834                            an error and try again next iteration
2835                         */                      
2836                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2837                         talloc_free(mem_ctx);
2838                         return MONITOR_FAILED;
2839                 }
2840
2841                 /* set up the callback functions */
2842                 state->async.fn = verify_recmaster_callback;
2843                 state->async.private_data = rmdata;
2844
2845                 /* one more control to wait for to complete */
2846                 rmdata->count++;
2847         }
2848
2849
2850         /* now wait for up to the maximum number of seconds allowed
2851            or until all nodes we expect a response from has replied
2852         */
2853         while (rmdata->count > 0) {
2854                 event_loop_once(ctdb->ev);
2855         }
2856
2857         status = rmdata->status;
2858         talloc_free(mem_ctx);
2859         return status;
2860 }
2861
2862 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2863                                     struct ctdb_recoverd *rec)
2864 {
2865         struct ctdb_control_get_ifaces *ifaces = NULL;
2866         TALLOC_CTX *mem_ctx;
2867         bool ret = false;
2868
2869         mem_ctx = talloc_new(NULL);
2870
2871         /* Read the interfaces from the local node */
2872         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2873                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2874                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2875                 /* We could return an error.  However, this will be
2876                  * rare so we'll decide that the interfaces have
2877                  * actually changed, just in case.
2878                  */
2879                 talloc_free(mem_ctx);
2880                 return true;
2881         }
2882
2883         if (!rec->ifaces) {
2884                 /* We haven't been here before so things have changed */
2885                 ret = true;
2886         } else if (rec->ifaces->num != ifaces->num) {
2887                 /* Number of interfaces has changed */
2888                 ret = true;
2889         } else {
2890                 /* See if interface names or link states have changed */
2891                 int i;
2892                 for (i = 0; i < rec->ifaces->num; i++) {
2893                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2894                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0 ||
2895                             iface->link_state != ifaces->ifaces[i].link_state) {
2896                                 ret = true;
2897                                 break;
2898                         }
2899                 }
2900         }
2901
2902         talloc_free(rec->ifaces);
2903         rec->ifaces = talloc_steal(rec, ifaces);
2904
2905         talloc_free(mem_ctx);
2906         return ret;
2907 }
2908
2909 /* called to check that the local allocation of public ip addresses is ok.
2910 */
2911 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2912 {
2913         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2914         struct ctdb_uptime *uptime1 = NULL;
2915         struct ctdb_uptime *uptime2 = NULL;
2916         int ret, j;
2917         bool need_takeover_run = false;
2918
2919         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2920                                 CTDB_CURRENT_NODE, &uptime1);
2921         if (ret != 0) {
2922                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2923                 talloc_free(mem_ctx);
2924                 return -1;
2925         }
2926
2927         if (interfaces_have_changed(ctdb, rec)) {
2928                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2929                                      "local node %u - force takeover run\n",
2930                                      pnn));
2931                 need_takeover_run = true;
2932         }
2933
2934         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2935                                 CTDB_CURRENT_NODE, &uptime2);
2936         if (ret != 0) {
2937                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2938                 talloc_free(mem_ctx);
2939                 return -1;
2940         }
2941
2942         /* skip the check if the startrecovery time has changed */
2943         if (timeval_compare(&uptime1->last_recovery_started,
2944                             &uptime2->last_recovery_started) != 0) {
2945                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2946                 talloc_free(mem_ctx);
2947                 return 0;
2948         }
2949
2950         /* skip the check if the endrecovery time has changed */
2951         if (timeval_compare(&uptime1->last_recovery_finished,
2952                             &uptime2->last_recovery_finished) != 0) {
2953                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2954                 talloc_free(mem_ctx);
2955                 return 0;
2956         }
2957
2958         /* skip the check if we have started but not finished recovery */
2959         if (timeval_compare(&uptime1->last_recovery_finished,
2960                             &uptime1->last_recovery_started) != 1) {
2961                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2962                 talloc_free(mem_ctx);
2963
2964                 return 0;
2965         }
2966
2967         /* verify that we have the ip addresses we should have
2968            and we dont have ones we shouldnt have.
2969            if we find an inconsistency we set recmode to
2970            active on the local node and wait for the recmaster
2971            to do a full blown recovery.
2972            also if the pnn is -1 and we are healthy and can host the ip
2973            we also request a ip reallocation.
2974         */
2975         if (ctdb->tunable.disable_ip_failover == 0) {
2976                 struct ctdb_all_public_ips *ips = NULL;
2977
2978                 /* read the *available* IPs from the local node */
2979                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2980                 if (ret != 0) {
2981                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
2982                         talloc_free(mem_ctx);
2983                         return -1;
2984                 }
2985
2986                 for (j=0; j<ips->num; j++) {
2987                         if (ips->ips[j].pnn == -1 &&
2988                             nodemap->nodes[pnn].flags == 0) {
2989                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
2990                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
2991                                 need_takeover_run = true;
2992                         }
2993                 }
2994
2995                 talloc_free(ips);
2996
2997                 /* read the *known* IPs from the local node */
2998                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2999                 if (ret != 0) {
3000                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3001                         talloc_free(mem_ctx);
3002                         return -1;
3003                 }
3004
3005                 for (j=0; j<ips->num; j++) {
3006                         if (ips->ips[j].pnn == pnn) {
3007                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3008                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3009                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3010                                         need_takeover_run = true;
3011                                 }
3012                         } else {
3013                                 if (ctdb->do_checkpublicip &&
3014                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3015
3016                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3017                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3018
3019                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3020                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3021                                         }
3022                                 }
3023                         }
3024                 }
3025         }
3026
3027         if (need_takeover_run) {
3028                 struct takeover_run_reply rd;
3029                 TDB_DATA data;
3030
3031                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3032
3033                 rd.pnn = ctdb->pnn;
3034                 rd.srvid = 0;
3035                 data.dptr = (uint8_t *)&rd;
3036                 data.dsize = sizeof(rd);
3037
3038                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3039                 if (ret != 0) {
3040                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3041                 }
3042         }
3043         talloc_free(mem_ctx);
3044         return 0;
3045 }
3046
3047
3048 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3049 {
3050         struct ctdb_node_map **remote_nodemaps = callback_data;
3051
3052         if (node_pnn >= ctdb->num_nodes) {
3053                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3054                 return;
3055         }
3056
3057         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3058
3059 }
3060
3061 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3062         struct ctdb_node_map *nodemap,
3063         struct ctdb_node_map **remote_nodemaps)
3064 {
3065         uint32_t *nodes;
3066
3067         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3068         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3069                                         nodes, 0,
3070                                         CONTROL_TIMEOUT(), false, tdb_null,
3071                                         async_getnodemap_callback,
3072                                         NULL,
3073                                         remote_nodemaps) != 0) {
3074                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3075
3076                 return -1;
3077         }
3078
3079         return 0;
3080 }
3081
3082 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3083 struct ctdb_check_reclock_state {
3084         struct ctdb_context *ctdb;
3085         struct timeval start_time;
3086         int fd[2];
3087         pid_t child;
3088         struct timed_event *te;
3089         struct fd_event *fde;
3090         enum reclock_child_status status;
3091 };
3092
3093 /* when we free the reclock state we must kill any child process.
3094 */
3095 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3096 {
3097         struct ctdb_context *ctdb = state->ctdb;
3098
3099         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3100
3101         if (state->fd[0] != -1) {
3102                 close(state->fd[0]);
3103                 state->fd[0] = -1;
3104         }
3105         if (state->fd[1] != -1) {
3106                 close(state->fd[1]);
3107                 state->fd[1] = -1;
3108         }
3109         ctdb_kill(ctdb, state->child, SIGKILL);
3110         return 0;
3111 }
3112
3113 /*
3114   called if our check_reclock child times out. this would happen if
3115   i/o to the reclock file blocks.
3116  */
3117 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3118                                          struct timeval t, void *private_data)
3119 {
3120         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3121                                            struct ctdb_check_reclock_state);
3122
3123         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3124         state->status = RECLOCK_TIMEOUT;
3125 }
3126
3127 /* this is called when the child process has completed checking the reclock
3128    file and has written data back to us through the pipe.
3129 */
3130 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3131                              uint16_t flags, void *private_data)
3132 {
3133         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3134                                              struct ctdb_check_reclock_state);
3135         char c = 0;
3136         int ret;
3137
3138         /* we got a response from our child process so we can abort the
3139            timeout.
3140         */
3141         talloc_free(state->te);
3142         state->te = NULL;
3143
3144         ret = read(state->fd[0], &c, 1);
3145         if (ret != 1 || c != RECLOCK_OK) {
3146                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3147                 state->status = RECLOCK_FAILED;
3148
3149                 return;
3150         }
3151
3152         state->status = RECLOCK_OK;
3153         return;
3154 }
3155
3156 static int check_recovery_lock(struct ctdb_context *ctdb)
3157 {
3158         int ret;
3159         struct ctdb_check_reclock_state *state;
3160         pid_t parent = getpid();
3161
3162         if (ctdb->recovery_lock_fd == -1) {
3163                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3164                 return -1;
3165         }
3166
3167         state = talloc(ctdb, struct ctdb_check_reclock_state);
3168         CTDB_NO_MEMORY(ctdb, state);
3169
3170         state->ctdb = ctdb;
3171         state->start_time = timeval_current();
3172         state->status = RECLOCK_CHECKING;
3173         state->fd[0] = -1;
3174         state->fd[1] = -1;
3175
3176         ret = pipe(state->fd);
3177         if (ret != 0) {
3178                 talloc_free(state);
3179                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3180                 return -1;
3181         }
3182
3183         state->child = ctdb_fork(ctdb);
3184         if (state->child == (pid_t)-1) {
3185                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3186                 close(state->fd[0]);
3187                 state->fd[0] = -1;
3188                 close(state->fd[1]);
3189                 state->fd[1] = -1;
3190                 talloc_free(state);
3191                 return -1;
3192         }
3193
3194         if (state->child == 0) {
3195                 char cc = RECLOCK_OK;
3196                 close(state->fd[0]);
3197                 state->fd[0] = -1;
3198
3199                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3200                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3201                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3202                         cc = RECLOCK_FAILED;
3203                 }
3204
3205                 write(state->fd[1], &cc, 1);
3206                 /* make sure we die when our parent dies */
3207                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3208                         sleep(5);
3209                         write(state->fd[1], &cc, 1);
3210                 }
3211                 _exit(0);
3212         }
3213         close(state->fd[1]);
3214         state->fd[1] = -1;
3215         set_close_on_exec(state->fd[0]);
3216
3217         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3218
3219         talloc_set_destructor(state, check_reclock_destructor);
3220
3221         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3222                                     ctdb_check_reclock_timeout, state);
3223         if (state->te == NULL) {
3224                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3225                 talloc_free(state);
3226                 return -1;
3227         }
3228
3229         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3230                                 EVENT_FD_READ,
3231                                 reclock_child_handler,
3232                                 (void *)state);
3233
3234         if (state->fde == NULL) {
3235                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3236                 talloc_free(state);
3237                 return -1;
3238         }
3239         tevent_fd_set_auto_close(state->fde);
3240
3241         while (state->status == RECLOCK_CHECKING) {
3242                 event_loop_once(ctdb->ev);
3243         }
3244
3245         if (state->status == RECLOCK_FAILED) {
3246                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3247                 close(ctdb->recovery_lock_fd);
3248                 ctdb->recovery_lock_fd = -1;
3249                 talloc_free(state);
3250                 return -1;
3251         }
3252
3253         talloc_free(state);
3254         return 0;
3255 }
3256
3257 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3258 {
3259         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3260         const char *reclockfile;
3261
3262         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3263                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3264                 talloc_free(tmp_ctx);
3265                 return -1;      
3266         }
3267
3268         if (reclockfile == NULL) {
3269                 if (ctdb->recovery_lock_file != NULL) {
3270                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3271                         talloc_free(ctdb->recovery_lock_file);
3272                         ctdb->recovery_lock_file = NULL;
3273                         if (ctdb->recovery_lock_fd != -1) {
3274                                 close(ctdb->recovery_lock_fd);
3275                                 ctdb->recovery_lock_fd = -1;
3276                         }
3277                 }
3278                 ctdb->tunable.verify_recovery_lock = 0;
3279                 talloc_free(tmp_ctx);
3280                 return 0;
3281         }
3282
3283         if (ctdb->recovery_lock_file == NULL) {
3284                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3285                 if (ctdb->recovery_lock_fd != -1) {
3286                         close(ctdb->recovery_lock_fd);
3287                         ctdb->recovery_lock_fd = -1;
3288                 }
3289                 talloc_free(tmp_ctx);
3290                 return 0;
3291         }
3292
3293
3294         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3295                 talloc_free(tmp_ctx);
3296                 return 0;
3297         }
3298
3299         talloc_free(ctdb->recovery_lock_file);
3300         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3301         ctdb->tunable.verify_recovery_lock = 0;
3302         if (ctdb->recovery_lock_fd != -1) {
3303                 close(ctdb->recovery_lock_fd);
3304                 ctdb->recovery_lock_fd = -1;
3305         }
3306
3307         talloc_free(tmp_ctx);
3308         return 0;
3309 }
3310
3311 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3312                       TALLOC_CTX *mem_ctx)
3313 {
3314         uint32_t pnn;
3315         struct ctdb_node_map *nodemap=NULL;
3316         struct ctdb_node_map *recmaster_nodemap=NULL;
3317         struct ctdb_node_map **remote_nodemaps=NULL;
3318         struct ctdb_vnn_map *vnnmap=NULL;
3319         struct ctdb_vnn_map *remote_vnnmap=NULL;
3320         int32_t debug_level;
3321         int i, j, ret;
3322
3323
3324
3325         /* verify that the main daemon is still running */
3326         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3327                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3328                 exit(-1);
3329         }
3330
3331         /* ping the local daemon to tell it we are alive */
3332         ctdb_ctrl_recd_ping(ctdb);
3333
3334         if (rec->election_timeout) {
3335                 /* an election is in progress */
3336                 return;
3337         }
3338
3339         /* read the debug level from the parent and update locally */
3340         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3341         if (ret !=0) {
3342                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3343                 return;
3344         }
3345         LogLevel = debug_level;
3346
3347
3348         /* We must check if we need to ban a node here but we want to do this
3349            as early as possible so we dont wait until we have pulled the node
3350            map from the local node. thats why we have the hardcoded value 20
3351         */
3352         for (i=0; i<ctdb->num_nodes; i++) {
3353                 struct ctdb_banning_state *ban_state;
3354
3355                 if (ctdb->nodes[i]->ban_state == NULL) {
3356                         continue;
3357                 }
3358                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3359                 if (ban_state->count < 20) {
3360                         continue;
3361                 }
3362                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3363                         ctdb->nodes[i]->pnn, ban_state->count,
3364                         ctdb->tunable.recovery_ban_period));
3365                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3366                 ban_state->count = 0;
3367         }
3368
3369         /* get relevant tunables */
3370         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3371         if (ret != 0) {
3372                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3373                 return;
3374         }
3375
3376         /* get the current recovery lock file from the server */
3377         if (update_recovery_lock_file(ctdb) != 0) {
3378                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3379                 return;
3380         }
3381
3382         /* Make sure that if recovery lock verification becomes disabled when
3383            we close the file
3384         */
3385         if (ctdb->tunable.verify_recovery_lock == 0) {
3386                 if (ctdb->recovery_lock_fd != -1) {
3387                         close(ctdb->recovery_lock_fd);
3388                         ctdb->recovery_lock_fd = -1;
3389                 }
3390         }
3391
3392         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3393         if (pnn == (uint32_t)-1) {
3394                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3395                 return;
3396         }
3397
3398         /* get the vnnmap */
3399         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3400         if (ret != 0) {
3401                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3402                 return;
3403         }
3404
3405
3406         /* get number of nodes */
3407         if (rec->nodemap) {
3408                 talloc_free(rec->nodemap);
3409                 rec->nodemap = NULL;
3410                 nodemap=NULL;
3411         }
3412         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3413         if (ret != 0) {
3414                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3415                 return;
3416         }
3417         nodemap = rec->nodemap;
3418
3419         /* remember our own node flags */
3420         rec->node_flags = nodemap->nodes[pnn].flags;
3421
3422         /* update the capabilities for all nodes */
3423         ret = update_capabilities(ctdb, nodemap);
3424         if (ret != 0) {
3425                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3426                 return;
3427         }
3428
3429         /* check which node is the recovery master */
3430         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3431         if (ret != 0) {
3432                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3433                 return;
3434         }
3435
3436         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3437         if (rec->recmaster != pnn) {
3438                 if (rec->ip_reallocate_ctx != NULL) {
3439                         talloc_free(rec->ip_reallocate_ctx);
3440                         rec->ip_reallocate_ctx = NULL;
3441                         rec->reallocate_callers = NULL;
3442                 }
3443         }
3444
3445         if (rec->recmaster == (uint32_t)-1) {
3446                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3447                 force_election(rec, pnn, nodemap);
3448                 return;
3449         }
3450
3451         /* if the local daemon is STOPPED, we verify that the databases are
3452            also frozen and thet the recmode is set to active 
3453         */
3454         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3455                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3456                 if (ret != 0) {
3457                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3458                 }
3459                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3460                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3461
3462                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3463                         if (ret != 0) {
3464                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED state\n"));
3465                                 return;
3466                         }
3467                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3468                         if (ret != 0) {
3469                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED state\n"));
3470
3471                                 return;
3472                         }
3473                         return;
3474                 }
3475         }
3476         /* If the local node is stopped, verify we are not the recmaster 
3477            and yield this role if so
3478         */
3479         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
3480                 DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
3481                 force_election(rec, pnn, nodemap);
3482                 return;
3483         }
3484         
3485         /*
3486          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3487          * but we have force an election and try to become the new
3488          * recmaster
3489          */
3490         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3491             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3492              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3493                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3494                                   " but we (node %u) have - force an election\n",
3495                                   rec->recmaster, pnn));
3496                 force_election(rec, pnn, nodemap);
3497                 return;
3498         }
3499
3500         /* count how many active nodes there are */
3501         rec->num_active    = 0;
3502         rec->num_connected = 0;
3503         for (i=0; i<nodemap->num; i++) {
3504                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3505                         rec->num_active++;
3506                 }
3507                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3508                         rec->num_connected++;
3509                 }
3510         }
3511
3512
3513         /* verify that the recmaster node is still active */
3514         for (j=0; j<nodemap->num; j++) {
3515                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3516                         break;
3517                 }
3518         }
3519
3520         if (j == nodemap->num) {
3521                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3522                 force_election(rec, pnn, nodemap);
3523                 return;
3524         }
3525
3526         /* if recovery master is disconnected we must elect a new recmaster */
3527         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3528                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3529                 force_election(rec, pnn, nodemap);
3530                 return;
3531         }
3532
3533         /* get nodemap from the recovery master to check if it is inactive */
3534         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3535                                    mem_ctx, &recmaster_nodemap);
3536         if (ret != 0) {
3537                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3538                           nodemap->nodes[j].pnn));
3539                 return;
3540         }
3541
3542
3543         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3544             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3545                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3546                 force_election(rec, pnn, nodemap);
3547                 return;
3548         }
3549
3550         /* If this node is stopped then it is not the recovery master
3551          * so the only remaining action is to potentially to verify
3552          * the local IP allocation below.  This won't accomplish
3553          * anything useful so skip it.
3554          */
3555         if (rec->node_flags & NODE_FLAGS_STOPPED) {
3556                 return;
3557         }
3558
3559         /* verify that we have all ip addresses we should have and we dont
3560          * have addresses we shouldnt have.
3561          */ 
3562         if (ctdb->tunable.disable_ip_failover == 0) {
3563                 if (rec->ip_check_disable_ctx == NULL) {
3564                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3565                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3566                         }
3567                 }
3568         }
3569
3570
3571         /* if we are not the recmaster then we do not need to check
3572            if recovery is needed
3573          */
3574         if (pnn != rec->recmaster) {
3575                 return;
3576         }
3577
3578
3579         /* ensure our local copies of flags are right */
3580         ret = update_local_flags(rec, nodemap);
3581         if (ret == MONITOR_ELECTION_NEEDED) {
3582                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3583                 force_election(rec, pnn, nodemap);
3584                 return;
3585         }
3586         if (ret != MONITOR_OK) {
3587                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3588                 return;
3589         }
3590
3591         if (ctdb->num_nodes != nodemap->num) {
3592                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3593                 reload_nodes_file(ctdb);
3594                 return;
3595         }
3596
3597         /* verify that all active nodes agree that we are the recmaster */
3598         switch (verify_recmaster(rec, nodemap, pnn)) {
3599         case MONITOR_RECOVERY_NEEDED:
3600                 /* can not happen */
3601                 return;
3602         case MONITOR_ELECTION_NEEDED:
3603                 force_election(rec, pnn, nodemap);
3604                 return;
3605         case MONITOR_OK:
3606                 break;
3607         case MONITOR_FAILED:
3608                 return;
3609         }
3610
3611
3612         if (rec->need_recovery) {
3613                 /* a previous recovery didn't finish */
3614                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3615                 return;
3616         }
3617
3618         /* verify that all active nodes are in normal mode 
3619            and not in recovery mode 
3620         */
3621         switch (verify_recmode(ctdb, nodemap)) {
3622         case MONITOR_RECOVERY_NEEDED:
3623                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3624                 return;
3625         case MONITOR_FAILED:
3626                 return;
3627         case MONITOR_ELECTION_NEEDED:
3628                 /* can not happen */
3629         case MONITOR_OK:
3630                 break;
3631         }
3632
3633
3634         if (ctdb->tunable.verify_recovery_lock != 0) {
3635                 /* we should have the reclock - check its not stale */
3636                 ret = check_recovery_lock(ctdb);
3637                 if (ret != 0) {
3638                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3639                         ctdb_set_culprit(rec, ctdb->pnn);
3640                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3641                         return;
3642                 }
3643         }
3644
3645
3646         /* is there a pending reload all ips ? */
3647         if (reload_all_ips_request != NULL) {
3648                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3649                 talloc_free(reload_all_ips_request);
3650                 reload_all_ips_request = NULL;
3651         }
3652
3653         /* if there are takeovers requested, perform it and notify the waiters */
3654         if (rec->reallocate_callers) {
3655                 process_ipreallocate_requests(ctdb, rec);
3656         }
3657
3658         /* get the nodemap for all active remote nodes
3659          */
3660         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3661         if (remote_nodemaps == NULL) {
3662                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3663                 return;
3664         }
3665         for(i=0; i<nodemap->num; i++) {
3666                 remote_nodemaps[i] = NULL;
3667         }
3668         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3669                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3670                 return;
3671         } 
3672
3673         /* verify that all other nodes have the same nodemap as we have
3674         */
3675         for (j=0; j<nodemap->num; j++) {
3676                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3677                         continue;
3678                 }
3679
3680                 if (remote_nodemaps[j] == NULL) {
3681                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3682                         ctdb_set_culprit(rec, j);
3683
3684                         return;
3685                 }
3686
3687                 /* if the nodes disagree on how many nodes there are
3688                    then this is a good reason to try recovery
3689                  */
3690                 if (remote_nodemaps[j]->num != nodemap->num) {
3691                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3692                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3693                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3694                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3695                         return;
3696                 }
3697
3698                 /* if the nodes disagree on which nodes exist and are
3699                    active, then that is also a good reason to do recovery
3700                  */
3701                 for (i=0;i<nodemap->num;i++) {
3702                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3703                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3704                                           nodemap->nodes[j].pnn, i, 
3705                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3706                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3707                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3708                                             vnnmap);
3709                                 return;
3710                         }
3711                 }
3712
3713                 /* verify the flags are consistent
3714                 */
3715                 for (i=0; i<nodemap->num; i++) {
3716                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3717                                 continue;
3718                         }
3719                         
3720                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3721                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3722                                   nodemap->nodes[j].pnn, 
3723                                   nodemap->nodes[i].pnn, 
3724                                   remote_nodemaps[j]->nodes[i].flags,
3725                                   nodemap->nodes[i].flags));
3726                                 if (i == j) {
3727                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3728                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3729                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3730                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3731                                                     vnnmap);
3732                                         return;
3733                                 } else {
3734                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3735                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3736                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3737                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3738                                                     vnnmap);
3739                                         return;
3740                                 }
3741                         }
3742                 }
3743         }
3744
3745
3746         /* there better be the same number of lmasters in the vnn map
3747            as there are active nodes or we will have to do a recovery
3748          */
3749         if (vnnmap->size != rec->num_active) {
3750                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3751                           vnnmap->size, rec->num_active));
3752                 ctdb_set_culprit(rec, ctdb->pnn);
3753                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3754                 return;
3755         }
3756
3757         /* verify that all active nodes in the nodemap also exist in 
3758            the vnnmap.
3759          */
3760         for (j=0; j<nodemap->num; j++) {
3761                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3762                         continue;
3763                 }
3764                 if (nodemap->nodes[j].pnn == pnn) {
3765                         continue;
3766                 }
3767
3768                 for (i=0; i<vnnmap->size; i++) {
3769                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3770                                 break;
3771                         }
3772                 }
3773                 if (i == vnnmap->size) {
3774                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3775                                   nodemap->nodes[j].pnn));
3776                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3777                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3778                         return;
3779                 }
3780         }
3781
3782         
3783         /* verify that all other nodes have the same vnnmap
3784            and are from the same generation
3785          */
3786         for (j=0; j<nodemap->num; j++) {
3787                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3788                         continue;
3789                 }
3790                 if (nodemap->nodes[j].pnn == pnn) {
3791                         continue;
3792                 }
3793
3794                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3795                                           mem_ctx, &remote_vnnmap);
3796                 if (ret != 0) {
3797                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3798                                   nodemap->nodes[j].pnn));
3799                         return;
3800                 }
3801
3802                 /* verify the vnnmap generation is the same */
3803                 if (vnnmap->generation != remote_vnnmap->generation) {
3804                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3805                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3806                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3807                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3808                         return;
3809                 }
3810
3811                 /* verify the vnnmap size is the same */
3812                 if (vnnmap->size != remote_vnnmap->size) {
3813                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3814                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3815                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3816                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3817                         return;
3818                 }
3819
3820                 /* verify the vnnmap is the same */
3821                 for (i=0;i<vnnmap->size;i++) {
3822                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3823                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3824                                           nodemap->nodes[j].pnn));
3825                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3826                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3827                                             vnnmap);
3828                                 return;
3829                         }
3830                 }
3831         }
3832
3833         /* we might need to change who has what IP assigned */
3834         if (rec->need_takeover_run) {
3835                 uint32_t culprit = (uint32_t)-1;
3836
3837                 rec->need_takeover_run = false;
3838
3839                 /* update the list of public ips that a node can handle for
3840                    all connected nodes
3841                 */
3842                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3843                 if (ret != 0) {
3844                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3845                                          culprit));
3846                         rec->need_takeover_run = true;
3847                         return;
3848                 }
3849
3850                 /* execute the "startrecovery" event script on all nodes */
3851                 ret = run_startrecovery_eventscript(rec, nodemap);
3852                 if (ret!=0) {
3853                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3854                         ctdb_set_culprit(rec, ctdb->pnn);
3855                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3856                         return;
3857                 }
3858
3859                 /* If takeover run fails, then the offending nodes are
3860                  * assigned ban culprit counts. And we re-try takeover.
3861                  * If takeover run fails repeatedly, the node would get
3862                  * banned.
3863                  *
3864                  * If rec->need_takeover_run is not set to true at this
3865                  * failure, monitoring is disabled cluster-wide (via
3866                  * startrecovery eventscript) and will not get enabled.
3867                  */
3868                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3869                 if (ret != 0) {
3870                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3871                         return;
3872                 }
3873
3874                 /* execute the "recovered" event script on all nodes */
3875                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3876 #if 0
3877 // we cant check whether the event completed successfully
3878 // since this script WILL fail if the node is in recovery mode
3879 // and if that race happens, the code here would just cause a second
3880 // cascading recovery.
3881                 if (ret!=0) {
3882                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3883                         ctdb_set_culprit(rec, ctdb->pnn);
3884                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3885                 }
3886 #endif
3887         }
3888 }
3889
3890 /*
3891   the main monitoring loop
3892  */
3893 static void monitor_cluster(struct ctdb_context *ctdb)
3894 {
3895         struct ctdb_recoverd *rec;
3896
3897         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3898
3899         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3900         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3901
3902         rec->ctdb = ctdb;
3903
3904         rec->priority_time = timeval_current();
3905
3906         /* register a message port for sending memory dumps */
3907         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3908
3909         /* register a message port for requesting logs */
3910         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3911
3912         /* register a message port for clearing logs */
3913         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3914
3915         /* register a message port for recovery elections */
3916         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3917
3918         /* when nodes are disabled/enabled */
3919         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3920
3921         /* when we are asked to puch out a flag change */
3922         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3923
3924         /* register a message port for vacuum fetch */
3925         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3926
3927         /* register a message port for reloadnodes  */
3928         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3929
3930         /* register a message port for performing a takeover run */
3931         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3932
3933         /* register a message port for performing a reload all ips */
3934         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3935
3936         /* register a message port for disabling the ip check for a short while */
3937         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3938
3939         /* register a message port for updating the recovery daemons node assignment for an ip */
3940         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3941
3942         /* register a message port for forcing a rebalance of a node next
3943            reallocation */
3944         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3945
3946         for (;;) {
3947                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3948                 struct timeval start;
3949                 double elapsed;
3950
3951                 if (!mem_ctx) {
3952                         DEBUG(DEBUG_CRIT,(__location__
3953                                           " Failed to create temp context\n"));
3954                         exit(-1);
3955                 }
3956
3957                 start = timeval_current();
3958                 main_loop(ctdb, rec, mem_ctx);
3959                 talloc_free(mem_ctx);
3960
3961                 /* we only check for recovery once every second */
3962                 elapsed = timeval_elapsed(&start);
3963                 if (elapsed < ctdb->tunable.recover_interval) {
3964                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3965                                           - elapsed);
3966                 }
3967         }
3968 }
3969
3970 /*
3971   event handler for when the main ctdbd dies
3972  */
3973 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3974                                  uint16_t flags, void *private_data)
3975 {
3976         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3977         _exit(1);
3978 }
3979
3980 /*
3981   called regularly to verify that the recovery daemon is still running
3982  */
3983 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3984                               struct timeval yt, void *p)
3985 {
3986         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3987
3988         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3989                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3990
3991                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3992                                 ctdb_restart_recd, ctdb);
3993
3994                 return;
3995         }
3996
3997         event_add_timed(ctdb->ev, ctdb->recd_ctx,
3998                         timeval_current_ofs(30, 0),
3999                         ctdb_check_recd, ctdb);
4000 }
4001
4002 static void recd_sig_child_handler(struct event_context *ev,
4003         struct signal_event *se, int signum, int count,
4004         void *dont_care, 
4005         void *private_data)
4006 {
4007 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4008         int status;
4009         pid_t pid = -1;
4010
4011         while (pid != 0) {
4012                 pid = waitpid(-1, &status, WNOHANG);
4013                 if (pid == -1) {
4014                         if (errno != ECHILD) {
4015                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4016                         }
4017                         return;
4018                 }
4019                 if (pid > 0) {
4020                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4021                 }
4022         }
4023 }
4024
4025 /*
4026   startup the recovery daemon as a child of the main ctdb daemon
4027  */
4028 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4029 {
4030         int fd[2];
4031         struct signal_event *se;
4032         struct tevent_fd *fde;
4033
4034         if (pipe(fd) != 0) {
4035                 return -1;
4036         }
4037
4038         ctdb->ctdbd_pid = getpid();
4039
4040         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4041         if (ctdb->recoverd_pid == -1) {
4042                 return -1;
4043         }
4044
4045         if (ctdb->recoverd_pid != 0) {
4046                 talloc_free(ctdb->recd_ctx);
4047                 ctdb->recd_ctx = talloc_new(ctdb);
4048                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4049
4050                 close(fd[0]);
4051                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4052                                 timeval_current_ofs(30, 0),
4053                                 ctdb_check_recd, ctdb);
4054                 return 0;
4055         }
4056
4057         close(fd[1]);
4058
4059         srandom(getpid() ^ time(NULL));
4060
4061         /* Clear the log ringbuffer */
4062         ctdb_clear_log(ctdb);
4063
4064         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4065                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4066                 exit(1);
4067         }
4068
4069         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4070
4071         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4072                      ctdb_recoverd_parent, &fd[0]);
4073         tevent_fd_set_auto_close(fde);
4074
4075         /* set up a handler to pick up sigchld */
4076         se = event_add_signal(ctdb->ev, ctdb,
4077                                      SIGCHLD, 0,
4078                                      recd_sig_child_handler,
4079                                      ctdb);
4080         if (se == NULL) {
4081                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4082                 exit(1);
4083         }
4084
4085         monitor_cluster(ctdb);
4086
4087         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4088         return -1;
4089 }
4090
4091 /*
4092   shutdown the recovery daemon
4093  */
4094 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4095 {
4096         if (ctdb->recoverd_pid == 0) {
4097                 return;
4098         }
4099
4100         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4101         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4102
4103         TALLOC_FREE(ctdb->recd_ctx);
4104         TALLOC_FREE(ctdb->recd_ping_count);
4105 }
4106
4107 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4108                        struct timeval t, void *private_data)
4109 {
4110         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4111
4112         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4113         ctdb_stop_recoverd(ctdb);
4114         ctdb_start_recoverd(ctdb);
4115 }