recovery daemon needs to monitor when the local ctdb daemon is stopped and ensure...
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /* list of "ctdb ipreallocate" processes to call back when we have
40    finished the takeover run.
41 */
42 struct ip_reallocate_list {
43         struct ip_reallocate_list *next;
44         struct rd_memdump_reply *rd;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         struct ctdb_node_map *nodemap;
56         uint32_t last_culprit;
57         uint32_t culprit_counter;
58         struct timeval first_recover_time;
59         struct ban_state **banned_nodes;
60         struct timeval priority_time;
61         bool need_takeover_run;
62         bool need_recovery;
63         uint32_t node_flags;
64         struct timed_event *send_election_te;
65         struct timed_event *election_timeout;
66         struct vacuum_info *vacuum_info;
67         TALLOC_CTX *ip_reallocate_ctx;
68         struct ip_reallocate_list *reallocate_callers;
69 };
70
71 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
72 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
73
74
75 /*
76   unban a node
77  */
78 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
79 {
80         struct ctdb_context *ctdb = rec->ctdb;
81
82         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
86                 return;
87         }
88
89         /* If we are unbanning a different node then just pass the ban info on */
90         if (pnn != ctdb->pnn) {
91                 TDB_DATA data;
92                 int ret;
93                 
94                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
95
96                 data.dptr = (uint8_t *)&pnn;
97                 data.dsize = sizeof(uint32_t);
98
99                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
100                 if (ret != 0) {
101                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
102                         return;
103                 }
104
105                 return;
106         }
107
108         /* make sure we remember we are no longer banned in case 
109            there is an election */
110         rec->node_flags &= ~NODE_FLAGS_BANNED;
111
112         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
113         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
114
115         if (rec->banned_nodes[pnn] == NULL) {
116                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
117                 return;
118         }
119
120         talloc_free(rec->banned_nodes[pnn]);
121         rec->banned_nodes[pnn] = NULL;
122 }
123
124
125 /*
126   called when a ban has timed out
127  */
128 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
129 {
130         struct ban_state *state = talloc_get_type(p, struct ban_state);
131         struct ctdb_recoverd *rec = state->rec;
132         uint32_t pnn = state->banned_node;
133
134         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
135         ctdb_unban_node(rec, pnn);
136 }
137
138 /*
139   ban a node for a period of time
140  */
141 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
142 {
143         struct ctdb_context *ctdb = rec->ctdb;
144
145         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
146
147         if (!ctdb_validate_pnn(ctdb, pnn)) {
148                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
149                 return;
150         }
151
152         if (0 == ctdb->tunable.enable_bans) {
153                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
154                 return;
155         }
156
157         /* If we are banning a different node then just pass the ban info on */
158         if (pnn != ctdb->pnn) {
159                 struct ctdb_ban_info b;
160                 TDB_DATA data;
161                 int ret;
162                 
163                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
164
165                 b.pnn = pnn;
166                 b.ban_time = ban_time;
167
168                 data.dptr = (uint8_t *)&b;
169                 data.dsize = sizeof(b);
170
171                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
172                 if (ret != 0) {
173                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
174                         return;
175                 }
176
177                 return;
178         }
179
180         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
181         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
182
183         /* banning ourselves - lower our election priority */
184         rec->priority_time = timeval_current();
185
186         /* make sure we remember we are banned in case there is an 
187            election */
188         rec->node_flags |= NODE_FLAGS_BANNED;
189
190         if (rec->banned_nodes[pnn] != NULL) {
191                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
192                 talloc_free(rec->banned_nodes[pnn]);
193                 rec->banned_nodes[pnn] = NULL;
194         }
195
196         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
197         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
198
199         rec->banned_nodes[pnn]->rec = rec;
200         rec->banned_nodes[pnn]->banned_node = pnn;
201
202         if (ban_time != 0) {
203                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
204                                 timeval_current_ofs(ban_time, 0),
205                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
206         }
207 }
208
209 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
210
211
212 /*
213   run the "recovered" eventscript on all nodes
214  */
215 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
216 {
217         TALLOC_CTX *tmp_ctx;
218         uint32_t *nodes;
219
220         tmp_ctx = talloc_new(ctdb);
221         CTDB_NO_MEMORY(ctdb, tmp_ctx);
222
223         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
224         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
225                                         nodes,
226                                         CONTROL_TIMEOUT(), false, tdb_null,
227                                         NULL, NULL,
228                                         NULL) != 0) {
229                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
230
231                 talloc_free(tmp_ctx);
232                 return -1;
233         }
234
235         talloc_free(tmp_ctx);
236         return 0;
237 }
238
239 /*
240   remember the trouble maker
241  */
242 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
243 {
244         struct ctdb_context *ctdb = rec->ctdb;
245
246         if (rec->last_culprit != culprit ||
247             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
248                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
249                 /* either a new node is the culprit, or we've decided to forgive them */
250                 rec->last_culprit = culprit;
251                 rec->first_recover_time = timeval_current();
252                 rec->culprit_counter = 0;
253         }
254         rec->culprit_counter++;
255 }
256
257 /*
258   remember the trouble maker
259  */
260 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
261 {
262         struct ctdb_context *ctdb = rec->ctdb;
263
264         if (rec->last_culprit != culprit ||
265             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
266                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
267                 /* either a new node is the culprit, or we've decided to forgive them */
268                 rec->last_culprit = culprit;
269                 rec->first_recover_time = timeval_current();
270                 rec->culprit_counter = 0;
271         }
272         rec->culprit_counter += count;
273 }
274
275 /* this callback is called for every node that failed to execute the
276    start recovery event
277 */
278 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
279 {
280         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
281
282         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
283
284         ctdb_set_culprit(rec, node_pnn);
285 }
286
287 /*
288   run the "startrecovery" eventscript on all nodes
289  */
290 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
291 {
292         TALLOC_CTX *tmp_ctx;
293         uint32_t *nodes;
294         struct ctdb_context *ctdb = rec->ctdb;
295
296         tmp_ctx = talloc_new(ctdb);
297         CTDB_NO_MEMORY(ctdb, tmp_ctx);
298
299         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
300         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
301                                         nodes,
302                                         CONTROL_TIMEOUT(), false, tdb_null,
303                                         NULL,
304                                         startrecovery_fail_callback,
305                                         rec) != 0) {
306                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
307                 talloc_free(tmp_ctx);
308                 return -1;
309         }
310
311         talloc_free(tmp_ctx);
312         return 0;
313 }
314
315 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
316 {
317         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
318                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
319                 return;
320         }
321         if (node_pnn < ctdb->num_nodes) {
322                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
323         }
324 }
325
326 /*
327   update the node capabilities for all connected nodes
328  */
329 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
330 {
331         uint32_t *nodes;
332         TALLOC_CTX *tmp_ctx;
333
334         tmp_ctx = talloc_new(ctdb);
335         CTDB_NO_MEMORY(ctdb, tmp_ctx);
336
337         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
339                                         nodes, CONTROL_TIMEOUT(),
340                                         false, tdb_null,
341                                         async_getcap_callback, NULL,
342                                         NULL) != 0) {
343                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
344                 talloc_free(tmp_ctx);
345                 return -1;
346         }
347
348         talloc_free(tmp_ctx);
349         return 0;
350 }
351
352 /*
353   change recovery mode on all nodes
354  */
355 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
356 {
357         TDB_DATA data;
358         uint32_t *nodes;
359         TALLOC_CTX *tmp_ctx;
360
361         tmp_ctx = talloc_new(ctdb);
362         CTDB_NO_MEMORY(ctdb, tmp_ctx);
363
364         /* freeze all nodes */
365         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
366         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
367                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
368                                                 nodes, CONTROL_TIMEOUT(),
369                                                 false, tdb_null,
370                                                 NULL, NULL,
371                                                 NULL) != 0) {
372                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
373                         talloc_free(tmp_ctx);
374                         return -1;
375                 }
376         }
377
378
379         data.dsize = sizeof(uint32_t);
380         data.dptr = (unsigned char *)&rec_mode;
381
382         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
383                                         nodes, CONTROL_TIMEOUT(),
384                                         false, data,
385                                         NULL, NULL,
386                                         NULL) != 0) {
387                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
388                 talloc_free(tmp_ctx);
389                 return -1;
390         }
391
392         talloc_free(tmp_ctx);
393         return 0;
394 }
395
396 /*
397   change recovery master on all node
398  */
399 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
400 {
401         TDB_DATA data;
402         TALLOC_CTX *tmp_ctx;
403         uint32_t *nodes;
404
405         tmp_ctx = talloc_new(ctdb);
406         CTDB_NO_MEMORY(ctdb, tmp_ctx);
407
408         data.dsize = sizeof(uint32_t);
409         data.dptr = (unsigned char *)&pnn;
410
411         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
412         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
413                                         nodes,
414                                         CONTROL_TIMEOUT(), false, data,
415                                         NULL, NULL,
416                                         NULL) != 0) {
417                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
418                 talloc_free(tmp_ctx);
419                 return -1;
420         }
421
422         talloc_free(tmp_ctx);
423         return 0;
424 }
425
426
427 /*
428   ensure all other nodes have attached to any databases that we have
429  */
430 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
431                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
432 {
433         int i, j, db, ret;
434         struct ctdb_dbid_map *remote_dbmap;
435
436         /* verify that all other nodes have all our databases */
437         for (j=0; j<nodemap->num; j++) {
438                 /* we dont need to ourself ourselves */
439                 if (nodemap->nodes[j].pnn == pnn) {
440                         continue;
441                 }
442                 /* dont check nodes that are unavailable */
443                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
444                         continue;
445                 }
446
447                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
448                                          mem_ctx, &remote_dbmap);
449                 if (ret != 0) {
450                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
451                         return -1;
452                 }
453
454                 /* step through all local databases */
455                 for (db=0; db<dbmap->num;db++) {
456                         const char *name;
457
458
459                         for (i=0;i<remote_dbmap->num;i++) {
460                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
461                                         break;
462                                 }
463                         }
464                         /* the remote node already have this database */
465                         if (i!=remote_dbmap->num) {
466                                 continue;
467                         }
468                         /* ok so we need to create this database */
469                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
470                                             mem_ctx, &name);
471                         if (ret != 0) {
472                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
473                                 return -1;
474                         }
475                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
476                                            mem_ctx, name, dbmap->dbs[db].persistent);
477                         if (ret != 0) {
478                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
479                                 return -1;
480                         }
481                 }
482         }
483
484         return 0;
485 }
486
487
488 /*
489   ensure we are attached to any databases that anyone else is attached to
490  */
491 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
492                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
493 {
494         int i, j, db, ret;
495         struct ctdb_dbid_map *remote_dbmap;
496
497         /* verify that we have all database any other node has */
498         for (j=0; j<nodemap->num; j++) {
499                 /* we dont need to ourself ourselves */
500                 if (nodemap->nodes[j].pnn == pnn) {
501                         continue;
502                 }
503                 /* dont check nodes that are unavailable */
504                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
505                         continue;
506                 }
507
508                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
509                                          mem_ctx, &remote_dbmap);
510                 if (ret != 0) {
511                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
512                         return -1;
513                 }
514
515                 /* step through all databases on the remote node */
516                 for (db=0; db<remote_dbmap->num;db++) {
517                         const char *name;
518
519                         for (i=0;i<(*dbmap)->num;i++) {
520                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
521                                         break;
522                                 }
523                         }
524                         /* we already have this db locally */
525                         if (i!=(*dbmap)->num) {
526                                 continue;
527                         }
528                         /* ok so we need to create this database and
529                            rebuild dbmap
530                          */
531                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
532                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
533                         if (ret != 0) {
534                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
535                                           nodemap->nodes[j].pnn));
536                                 return -1;
537                         }
538                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
539                                            remote_dbmap->dbs[db].persistent);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
542                                 return -1;
543                         }
544                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
545                         if (ret != 0) {
546                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
547                                 return -1;
548                         }
549                 }
550         }
551
552         return 0;
553 }
554
555
556 /*
557   pull the remote database contents from one node into the recdb
558  */
559 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
560                                     struct tdb_wrap *recdb, uint32_t dbid)
561 {
562         int ret;
563         TDB_DATA outdata;
564         struct ctdb_marshall_buffer *reply;
565         struct ctdb_rec_data *rec;
566         int i;
567         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
568
569         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
570                                CONTROL_TIMEOUT(), &outdata);
571         if (ret != 0) {
572                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
573                 talloc_free(tmp_ctx);
574                 return -1;
575         }
576
577         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
578
579         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
580                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
581                 talloc_free(tmp_ctx);
582                 return -1;
583         }
584         
585         rec = (struct ctdb_rec_data *)&reply->data[0];
586         
587         for (i=0;
588              i<reply->count;
589              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
590                 TDB_DATA key, data;
591                 struct ctdb_ltdb_header *hdr;
592                 TDB_DATA existing;
593                 
594                 key.dptr = &rec->data[0];
595                 key.dsize = rec->keylen;
596                 data.dptr = &rec->data[key.dsize];
597                 data.dsize = rec->datalen;
598                 
599                 hdr = (struct ctdb_ltdb_header *)data.dptr;
600
601                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
602                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
603                         talloc_free(tmp_ctx);
604                         return -1;
605                 }
606
607                 /* fetch the existing record, if any */
608                 existing = tdb_fetch(recdb->tdb, key);
609                 
610                 if (existing.dptr != NULL) {
611                         struct ctdb_ltdb_header header;
612                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
613                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
614                                          (unsigned)existing.dsize, srcnode));
615                                 free(existing.dptr);
616                                 talloc_free(tmp_ctx);
617                                 return -1;
618                         }
619                         header = *(struct ctdb_ltdb_header *)existing.dptr;
620                         free(existing.dptr);
621                         if (!(header.rsn < hdr->rsn ||
622                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
623                                 continue;
624                         }
625                 }
626                 
627                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
628                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
629                         talloc_free(tmp_ctx);
630                         return -1;                              
631                 }
632         }
633
634         talloc_free(tmp_ctx);
635
636         return 0;
637 }
638
639 /*
640   pull all the remote database contents into the recdb
641  */
642 static int pull_remote_database(struct ctdb_context *ctdb,
643                                 struct ctdb_recoverd *rec, 
644                                 struct ctdb_node_map *nodemap, 
645                                 struct tdb_wrap *recdb, uint32_t dbid)
646 {
647         int j;
648
649         /* pull all records from all other nodes across onto this node
650            (this merges based on rsn)
651         */
652         for (j=0; j<nodemap->num; j++) {
653                 /* dont merge from nodes that are unavailable */
654                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
655                         continue;
656                 }
657                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
658                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
659                                  nodemap->nodes[j].pnn));
660                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
661                         return -1;
662                 }
663         }
664         
665         return 0;
666 }
667
668
669 /*
670   update flags on all active nodes
671  */
672 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
673 {
674         int ret;
675
676         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
677                 if (ret != 0) {
678                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
679                 return -1;
680         }
681
682         return 0;
683 }
684
685 /*
686   ensure all nodes have the same vnnmap we do
687  */
688 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
689                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
690 {
691         int j, ret;
692
693         /* push the new vnn map out to all the nodes */
694         for (j=0; j<nodemap->num; j++) {
695                 /* dont push to nodes that are unavailable */
696                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
697                         continue;
698                 }
699
700                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
701                 if (ret != 0) {
702                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
703                         return -1;
704                 }
705         }
706
707         return 0;
708 }
709
710
711 /*
712   handler for when the admin bans a node
713 */
714 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
715                         TDB_DATA data, void *private_data)
716 {
717         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
718         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
719         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
720
721         if (data.dsize != sizeof(*b)) {
722                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
723                 talloc_free(mem_ctx);
724                 return;
725         }
726
727         if (b->pnn != ctdb->pnn) {
728                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
729                 return;
730         }
731
732         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
733                  b->pnn, b->ban_time));
734
735         ctdb_ban_node(rec, b->pnn, b->ban_time);
736         talloc_free(mem_ctx);
737 }
738
739 /*
740   handler for when the admin unbans a node
741 */
742 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
743                           TDB_DATA data, void *private_data)
744 {
745         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
746         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
747         uint32_t pnn;
748
749         if (data.dsize != sizeof(uint32_t)) {
750                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
751                 talloc_free(mem_ctx);
752                 return;
753         }
754         pnn = *(uint32_t *)data.dptr;
755
756         if (pnn != ctdb->pnn) {
757                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
758                 return;
759         }
760
761         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
762         ctdb_unban_node(rec, pnn);
763         talloc_free(mem_ctx);
764 }
765
766
767 struct vacuum_info {
768         struct vacuum_info *next, *prev;
769         struct ctdb_recoverd *rec;
770         uint32_t srcnode;
771         struct ctdb_db_context *ctdb_db;
772         struct ctdb_marshall_buffer *recs;
773         struct ctdb_rec_data *r;
774 };
775
776 static void vacuum_fetch_next(struct vacuum_info *v);
777
778 /*
779   called when a vacuum fetch has completed - just free it and do the next one
780  */
781 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
782 {
783         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
784         talloc_free(state);
785         vacuum_fetch_next(v);
786 }
787
788
789 /*
790   process the next element from the vacuum list
791 */
792 static void vacuum_fetch_next(struct vacuum_info *v)
793 {
794         struct ctdb_call call;
795         struct ctdb_rec_data *r;
796
797         while (v->recs->count) {
798                 struct ctdb_client_call_state *state;
799                 TDB_DATA data;
800                 struct ctdb_ltdb_header *hdr;
801
802                 ZERO_STRUCT(call);
803                 call.call_id = CTDB_NULL_FUNC;
804                 call.flags = CTDB_IMMEDIATE_MIGRATION;
805
806                 r = v->r;
807                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
808                 v->recs->count--;
809
810                 call.key.dptr = &r->data[0];
811                 call.key.dsize = r->keylen;
812
813                 /* ensure we don't block this daemon - just skip a record if we can't get
814                    the chainlock */
815                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
816                         continue;
817                 }
818
819                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
820                 if (data.dptr == NULL) {
821                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
822                         continue;
823                 }
824
825                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
826                         free(data.dptr);
827                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
828                         continue;
829                 }
830                 
831                 hdr = (struct ctdb_ltdb_header *)data.dptr;
832                 if (hdr->dmaster == v->rec->ctdb->pnn) {
833                         /* its already local */
834                         free(data.dptr);
835                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
836                         continue;
837                 }
838
839                 free(data.dptr);
840
841                 state = ctdb_call_send(v->ctdb_db, &call);
842                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
843                 if (state == NULL) {
844                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
845                         talloc_free(v);
846                         return;
847                 }
848                 state->async.fn = vacuum_fetch_callback;
849                 state->async.private_data = v;
850                 return;
851         }
852
853         talloc_free(v);
854 }
855
856
857 /*
858   destroy a vacuum info structure
859  */
860 static int vacuum_info_destructor(struct vacuum_info *v)
861 {
862         DLIST_REMOVE(v->rec->vacuum_info, v);
863         return 0;
864 }
865
866
867 /*
868   handler for vacuum fetch
869 */
870 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
871                                  TDB_DATA data, void *private_data)
872 {
873         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
874         struct ctdb_marshall_buffer *recs;
875         int ret, i;
876         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
877         const char *name;
878         struct ctdb_dbid_map *dbmap=NULL;
879         bool persistent = false;
880         struct ctdb_db_context *ctdb_db;
881         struct ctdb_rec_data *r;
882         uint32_t srcnode;
883         struct vacuum_info *v;
884
885         recs = (struct ctdb_marshall_buffer *)data.dptr;
886         r = (struct ctdb_rec_data *)&recs->data[0];
887
888         if (recs->count == 0) {
889                 talloc_free(tmp_ctx);
890                 return;
891         }
892
893         srcnode = r->reqid;
894
895         for (v=rec->vacuum_info;v;v=v->next) {
896                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
897                         /* we're already working on records from this node */
898                         talloc_free(tmp_ctx);
899                         return;
900                 }
901         }
902
903         /* work out if the database is persistent */
904         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
905         if (ret != 0) {
906                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
907                 talloc_free(tmp_ctx);
908                 return;
909         }
910
911         for (i=0;i<dbmap->num;i++) {
912                 if (dbmap->dbs[i].dbid == recs->db_id) {
913                         persistent = dbmap->dbs[i].persistent;
914                         break;
915                 }
916         }
917         if (i == dbmap->num) {
918                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
919                 talloc_free(tmp_ctx);
920                 return;         
921         }
922
923         /* find the name of this database */
924         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
925                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
926                 talloc_free(tmp_ctx);
927                 return;
928         }
929
930         /* attach to it */
931         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
932         if (ctdb_db == NULL) {
933                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
934                 talloc_free(tmp_ctx);
935                 return;
936         }
937
938         v = talloc_zero(rec, struct vacuum_info);
939         if (v == NULL) {
940                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
941                 talloc_free(tmp_ctx);
942                 return;
943         }
944
945         v->rec = rec;
946         v->srcnode = srcnode;
947         v->ctdb_db = ctdb_db;
948         v->recs = talloc_memdup(v, recs, data.dsize);
949         if (v->recs == NULL) {
950                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
951                 talloc_free(v);
952                 talloc_free(tmp_ctx);
953                 return;         
954         }
955         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
956
957         DLIST_ADD(rec->vacuum_info, v);
958
959         talloc_set_destructor(v, vacuum_info_destructor);
960
961         vacuum_fetch_next(v);
962         talloc_free(tmp_ctx);
963 }
964
965
966 /*
967   called when ctdb_wait_timeout should finish
968  */
969 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
970                               struct timeval yt, void *p)
971 {
972         uint32_t *timed_out = (uint32_t *)p;
973         (*timed_out) = 1;
974 }
975
976 /*
977   wait for a given number of seconds
978  */
979 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
980 {
981         uint32_t timed_out = 0;
982         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
983         while (!timed_out) {
984                 event_loop_once(ctdb->ev);
985         }
986 }
987
988 /*
989   called when an election times out (ends)
990  */
991 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
992                                   struct timeval t, void *p)
993 {
994         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
995         rec->election_timeout = NULL;
996 }
997
998
999 /*
1000   wait for an election to finish. It finished election_timeout seconds after
1001   the last election packet is received
1002  */
1003 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1004 {
1005         struct ctdb_context *ctdb = rec->ctdb;
1006         while (rec->election_timeout) {
1007                 event_loop_once(ctdb->ev);
1008         }
1009 }
1010
1011 /*
1012   Update our local flags from all remote connected nodes. 
1013   This is only run when we are or we belive we are the recovery master
1014  */
1015 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1016 {
1017         int j;
1018         struct ctdb_context *ctdb = rec->ctdb;
1019         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1020
1021         /* get the nodemap for all active remote nodes and verify
1022            they are the same as for this node
1023          */
1024         for (j=0; j<nodemap->num; j++) {
1025                 struct ctdb_node_map *remote_nodemap=NULL;
1026                 int ret;
1027
1028                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1029                         continue;
1030                 }
1031                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1032                         continue;
1033                 }
1034
1035                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1036                                            mem_ctx, &remote_nodemap);
1037                 if (ret != 0) {
1038                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1039                                   nodemap->nodes[j].pnn));
1040                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1041                         talloc_free(mem_ctx);
1042                         return MONITOR_FAILED;
1043                 }
1044                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1045                         int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
1046
1047                         if (ban_changed) {
1048                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1049                                 nodemap->nodes[j].pnn,
1050                                 remote_nodemap->nodes[j].flags,
1051                                 nodemap->nodes[j].flags));
1052                         }
1053
1054                         /* We should tell our daemon about this so it
1055                            updates its flags or else we will log the same 
1056                            message again in the next iteration of recovery.
1057                            Since we are the recovery master we can just as
1058                            well update the flags on all nodes.
1059                         */
1060                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1061                         if (ret != 0) {
1062                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1063                                 return -1;
1064                         }
1065
1066                         /* Update our local copy of the flags in the recovery
1067                            daemon.
1068                         */
1069                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1070                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1071                                  nodemap->nodes[j].flags));
1072                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1073
1074                         /* If the BANNED flag has changed for the node
1075                            this is a good reason to do a new election.
1076                          */
1077                         if (ban_changed) {
1078                                 talloc_free(mem_ctx);
1079                                 return MONITOR_ELECTION_NEEDED;
1080                         }
1081
1082                 }
1083                 talloc_free(remote_nodemap);
1084         }
1085         talloc_free(mem_ctx);
1086         return MONITOR_OK;
1087 }
1088
1089
1090 /* Create a new random generation ip. 
1091    The generation id can not be the INVALID_GENERATION id
1092 */
1093 static uint32_t new_generation(void)
1094 {
1095         uint32_t generation;
1096
1097         while (1) {
1098                 generation = random();
1099
1100                 if (generation != INVALID_GENERATION) {
1101                         break;
1102                 }
1103         }
1104
1105         return generation;
1106 }
1107
1108
1109 /*
1110   create a temporary working database
1111  */
1112 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1113 {
1114         char *name;
1115         struct tdb_wrap *recdb;
1116         unsigned tdb_flags;
1117
1118         /* open up the temporary recovery database */
1119         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1120         if (name == NULL) {
1121                 return NULL;
1122         }
1123         unlink(name);
1124
1125         tdb_flags = TDB_NOLOCK;
1126         if (!ctdb->do_setsched) {
1127                 tdb_flags |= TDB_NOMMAP;
1128         }
1129
1130         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1131                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1132         if (recdb == NULL) {
1133                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1134         }
1135
1136         talloc_free(name);
1137
1138         return recdb;
1139 }
1140
1141
1142 /* 
1143    a traverse function for pulling all relevent records from recdb
1144  */
1145 struct recdb_data {
1146         struct ctdb_context *ctdb;
1147         struct ctdb_marshall_buffer *recdata;
1148         uint32_t len;
1149         bool failed;
1150 };
1151
1152 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1153 {
1154         struct recdb_data *params = (struct recdb_data *)p;
1155         struct ctdb_rec_data *rec;
1156         struct ctdb_ltdb_header *hdr;
1157
1158         /* skip empty records */
1159         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1160                 return 0;
1161         }
1162
1163         /* update the dmaster field to point to us */
1164         hdr = (struct ctdb_ltdb_header *)data.dptr;
1165         hdr->dmaster = params->ctdb->pnn;
1166
1167         /* add the record to the blob ready to send to the nodes */
1168         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1169         if (rec == NULL) {
1170                 params->failed = true;
1171                 return -1;
1172         }
1173         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1174         if (params->recdata == NULL) {
1175                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1176                          rec->length + params->len, params->recdata->count));
1177                 params->failed = true;
1178                 return -1;
1179         }
1180         params->recdata->count++;
1181         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1182         params->len += rec->length;
1183         talloc_free(rec);
1184
1185         return 0;
1186 }
1187
1188 /*
1189   push the recdb database out to all nodes
1190  */
1191 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1192                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1193 {
1194         struct recdb_data params;
1195         struct ctdb_marshall_buffer *recdata;
1196         TDB_DATA outdata;
1197         TALLOC_CTX *tmp_ctx;
1198         uint32_t *nodes;
1199
1200         tmp_ctx = talloc_new(ctdb);
1201         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1202
1203         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1204         CTDB_NO_MEMORY(ctdb, recdata);
1205
1206         recdata->db_id = dbid;
1207
1208         params.ctdb = ctdb;
1209         params.recdata = recdata;
1210         params.len = offsetof(struct ctdb_marshall_buffer, data);
1211         params.failed = false;
1212
1213         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1214                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1215                 talloc_free(params.recdata);
1216                 talloc_free(tmp_ctx);
1217                 return -1;
1218         }
1219
1220         if (params.failed) {
1221                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1222                 talloc_free(params.recdata);
1223                 talloc_free(tmp_ctx);
1224                 return -1;              
1225         }
1226
1227         recdata = params.recdata;
1228
1229         outdata.dptr = (void *)recdata;
1230         outdata.dsize = params.len;
1231
1232         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1233         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1234                                         nodes,
1235                                         CONTROL_TIMEOUT(), false, outdata,
1236                                         NULL, NULL,
1237                                         NULL) != 0) {
1238                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1239                 talloc_free(recdata);
1240                 talloc_free(tmp_ctx);
1241                 return -1;
1242         }
1243
1244         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1245                   dbid, recdata->count));
1246
1247         talloc_free(recdata);
1248         talloc_free(tmp_ctx);
1249
1250         return 0;
1251 }
1252
1253
1254 /*
1255   go through a full recovery on one database 
1256  */
1257 static int recover_database(struct ctdb_recoverd *rec, 
1258                             TALLOC_CTX *mem_ctx,
1259                             uint32_t dbid,
1260                             uint32_t pnn, 
1261                             struct ctdb_node_map *nodemap,
1262                             uint32_t transaction_id)
1263 {
1264         struct tdb_wrap *recdb;
1265         int ret;
1266         struct ctdb_context *ctdb = rec->ctdb;
1267         TDB_DATA data;
1268         struct ctdb_control_wipe_database w;
1269         uint32_t *nodes;
1270
1271         recdb = create_recdb(ctdb, mem_ctx);
1272         if (recdb == NULL) {
1273                 return -1;
1274         }
1275
1276         /* pull all remote databases onto the recdb */
1277         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1278         if (ret != 0) {
1279                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1280                 return -1;
1281         }
1282
1283         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1284
1285         /* wipe all the remote databases. This is safe as we are in a transaction */
1286         w.db_id = dbid;
1287         w.transaction_id = transaction_id;
1288
1289         data.dptr = (void *)&w;
1290         data.dsize = sizeof(w);
1291
1292         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1293         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1294                                         nodes,
1295                                         CONTROL_TIMEOUT(), false, data,
1296                                         NULL, NULL,
1297                                         NULL) != 0) {
1298                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1299                 talloc_free(recdb);
1300                 return -1;
1301         }
1302         
1303         /* push out the correct database. This sets the dmaster and skips 
1304            the empty records */
1305         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1306         if (ret != 0) {
1307                 talloc_free(recdb);
1308                 return -1;
1309         }
1310
1311         /* all done with this database */
1312         talloc_free(recdb);
1313
1314         return 0;
1315 }
1316
1317 /*
1318   reload the nodes file 
1319 */
1320 static void reload_nodes_file(struct ctdb_context *ctdb)
1321 {
1322         ctdb->nodes = NULL;
1323         ctdb_load_nodes_file(ctdb);
1324 }
1325
1326         
1327 /*
1328   we are the recmaster, and recovery is needed - start a recovery run
1329  */
1330 static int do_recovery(struct ctdb_recoverd *rec, 
1331                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1332                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1333                        int32_t culprit)
1334 {
1335         struct ctdb_context *ctdb = rec->ctdb;
1336         int i, j, ret;
1337         uint32_t generation;
1338         struct ctdb_dbid_map *dbmap;
1339         TDB_DATA data;
1340         uint32_t *nodes;
1341         struct timeval start_time;
1342
1343         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1344
1345         /* if recovery fails, force it again */
1346         rec->need_recovery = true;
1347
1348         if (culprit != -1) {
1349                 ctdb_set_culprit(rec, culprit);
1350         }
1351
1352         if (rec->culprit_counter > 2*nodemap->num) {
1353                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1354                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1355                          ctdb->tunable.recovery_ban_period));
1356                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
1357         }
1358
1359
1360         if (ctdb->tunable.verify_recovery_lock != 0) {
1361                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1362                 start_time = timeval_current();
1363                 if (!ctdb_recovery_lock(ctdb, true)) {
1364                         ctdb_set_culprit(rec, pnn);
1365                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1366                         return -1;
1367                 }
1368                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1369                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1370         }
1371
1372         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1373
1374         /* get a list of all databases */
1375         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1376         if (ret != 0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1378                 return -1;
1379         }
1380
1381         /* we do the db creation before we set the recovery mode, so the freeze happens
1382            on all databases we will be dealing with. */
1383
1384         /* verify that we have all the databases any other node has */
1385         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1386         if (ret != 0) {
1387                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1388                 return -1;
1389         }
1390
1391         /* verify that all other nodes have all our databases */
1392         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1393         if (ret != 0) {
1394                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1395                 return -1;
1396         }
1397
1398         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1399
1400
1401         /* set recovery mode to active on all nodes */
1402         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1403         if (ret != 0) {
1404                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1405                 return -1;
1406         }
1407
1408         /* execute the "startrecovery" event script on all nodes */
1409         ret = run_startrecovery_eventscript(rec, nodemap);
1410         if (ret!=0) {
1411                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1412                 return -1;
1413         }
1414
1415         /* pick a new generation number */
1416         generation = new_generation();
1417
1418         /* change the vnnmap on this node to use the new generation 
1419            number but not on any other nodes.
1420            this guarantees that if we abort the recovery prematurely
1421            for some reason (a node stops responding?)
1422            that we can just return immediately and we will reenter
1423            recovery shortly again.
1424            I.e. we deliberately leave the cluster with an inconsistent
1425            generation id to allow us to abort recovery at any stage and
1426            just restart it from scratch.
1427          */
1428         vnnmap->generation = generation;
1429         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1430         if (ret != 0) {
1431                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1432                 return -1;
1433         }
1434
1435         data.dptr = (void *)&generation;
1436         data.dsize = sizeof(uint32_t);
1437
1438         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1439         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1440                                         nodes,
1441                                         CONTROL_TIMEOUT(), false, data,
1442                                         NULL, NULL,
1443                                         NULL) != 0) {
1444                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1445                 return -1;
1446         }
1447
1448         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1449
1450         for (i=0;i<dbmap->num;i++) {
1451                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1452                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1453                         return -1;
1454                 }
1455         }
1456
1457         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1458
1459         /* commit all the changes */
1460         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1461                                         nodes,
1462                                         CONTROL_TIMEOUT(), false, data,
1463                                         NULL, NULL,
1464                                         NULL) != 0) {
1465                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1466                 return -1;
1467         }
1468
1469         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1470         
1471
1472         /* update the capabilities for all nodes */
1473         ret = update_capabilities(ctdb, nodemap);
1474         if (ret!=0) {
1475                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1476                 return -1;
1477         }
1478
1479         /* build a new vnn map with all the currently active and
1480            unbanned nodes */
1481         generation = new_generation();
1482         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1483         CTDB_NO_MEMORY(ctdb, vnnmap);
1484         vnnmap->generation = generation;
1485         vnnmap->size = 0;
1486         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1487         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1488         for (i=j=0;i<nodemap->num;i++) {
1489                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1490                         continue;
1491                 }
1492                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1493                         /* this node can not be an lmaster */
1494                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1495                         continue;
1496                 }
1497
1498                 vnnmap->size++;
1499                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1500                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1501                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1502
1503         }
1504         if (vnnmap->size == 0) {
1505                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1506                 vnnmap->size++;
1507                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1508                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1509                 vnnmap->map[0] = pnn;
1510         }       
1511
1512         /* update to the new vnnmap on all nodes */
1513         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1514         if (ret != 0) {
1515                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1516                 return -1;
1517         }
1518
1519         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1520
1521         /* update recmaster to point to us for all nodes */
1522         ret = set_recovery_master(ctdb, nodemap, pnn);
1523         if (ret!=0) {
1524                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1525                 return -1;
1526         }
1527
1528         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1529
1530         /*
1531           update all nodes to have the same flags that we have
1532          */
1533         for (i=0;i<nodemap->num;i++) {
1534                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1535                         continue;
1536                 }
1537
1538                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1539                 if (ret != 0) {
1540                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1541                         return -1;
1542                 }
1543         }
1544
1545         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1546
1547         /* disable recovery mode */
1548         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1549         if (ret != 0) {
1550                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1551                 return -1;
1552         }
1553
1554         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1555
1556         /*
1557           tell nodes to takeover their public IPs
1558          */
1559         rec->need_takeover_run = false;
1560         ret = ctdb_takeover_run(ctdb, nodemap);
1561         if (ret != 0) {
1562                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1563                 return -1;
1564         }
1565         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1566
1567         /* execute the "recovered" event script on all nodes */
1568         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1569         if (ret!=0) {
1570                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1571                 return -1;
1572         }
1573
1574         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1575
1576         /* send a message to all clients telling them that the cluster 
1577            has been reconfigured */
1578         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1579
1580         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1581
1582         rec->need_recovery = false;
1583
1584         /* We just finished a recovery successfully. 
1585            We now wait for rerecovery_timeout before we allow 
1586            another recovery to take place.
1587         */
1588         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1589         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1590         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1591
1592         return 0;
1593 }
1594
1595
1596 /*
1597   elections are won by first checking the number of connected nodes, then
1598   the priority time, then the pnn
1599  */
1600 struct election_message {
1601         uint32_t num_connected;
1602         struct timeval priority_time;
1603         uint32_t pnn;
1604         uint32_t node_flags;
1605 };
1606
1607 /*
1608   form this nodes election data
1609  */
1610 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1611 {
1612         int ret, i;
1613         struct ctdb_node_map *nodemap;
1614         struct ctdb_context *ctdb = rec->ctdb;
1615
1616         ZERO_STRUCTP(em);
1617
1618         em->pnn = rec->ctdb->pnn;
1619         em->priority_time = rec->priority_time;
1620         em->node_flags = rec->node_flags;
1621
1622         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1623         if (ret != 0) {
1624                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1625                 return;
1626         }
1627
1628         for (i=0;i<nodemap->num;i++) {
1629                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1630                         em->num_connected++;
1631                 }
1632         }
1633
1634         /* we shouldnt try to win this election if we cant be a recmaster */
1635         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1636                 em->num_connected = 0;
1637                 em->priority_time = timeval_current();
1638         }
1639
1640         talloc_free(nodemap);
1641 }
1642
1643 /*
1644   see if the given election data wins
1645  */
1646 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1647 {
1648         struct election_message myem;
1649         int cmp = 0;
1650
1651         ctdb_election_data(rec, &myem);
1652
1653         /* we cant win if we dont have the recmaster capability */
1654         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1655                 return false;
1656         }
1657
1658         /* we cant win if we are banned */
1659         if (rec->node_flags & NODE_FLAGS_BANNED) {
1660                 return false;
1661         }       
1662
1663         /* we will automatically win if the other node is banned */
1664         if (em->node_flags & NODE_FLAGS_BANNED) {
1665                 return true;
1666         }
1667
1668         /* try to use the most connected node */
1669         if (cmp == 0) {
1670                 cmp = (int)myem.num_connected - (int)em->num_connected;
1671         }
1672
1673         /* then the longest running node */
1674         if (cmp == 0) {
1675                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1676         }
1677
1678         if (cmp == 0) {
1679                 cmp = (int)myem.pnn - (int)em->pnn;
1680         }
1681
1682         return cmp > 0;
1683 }
1684
1685 /*
1686   send out an election request
1687  */
1688 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1689 {
1690         int ret;
1691         TDB_DATA election_data;
1692         struct election_message emsg;
1693         uint64_t srvid;
1694         struct ctdb_context *ctdb = rec->ctdb;
1695
1696         srvid = CTDB_SRVID_RECOVERY;
1697
1698         ctdb_election_data(rec, &emsg);
1699
1700         election_data.dsize = sizeof(struct election_message);
1701         election_data.dptr  = (unsigned char *)&emsg;
1702
1703
1704         /* send an election message to all active nodes */
1705         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1706
1707
1708         /* A new node that is already frozen has entered the cluster.
1709            The existing nodes are not frozen and dont need to be frozen
1710            until the election has ended and we start the actual recovery
1711         */
1712         if (update_recmaster == true) {
1713                 /* first we assume we will win the election and set 
1714                    recoverymaster to be ourself on the current node
1715                  */
1716                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1717                 if (ret != 0) {
1718                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1719                         return -1;
1720                 }
1721         }
1722
1723
1724         return 0;
1725 }
1726
1727 /*
1728   this function will unban all nodes in the cluster
1729 */
1730 static void unban_all_nodes(struct ctdb_context *ctdb)
1731 {
1732         int ret, i;
1733         struct ctdb_node_map *nodemap;
1734         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1735         
1736         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1737         if (ret != 0) {
1738                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1739                 return;
1740         }
1741
1742         for (i=0;i<nodemap->num;i++) {
1743                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1744                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1745                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1746                 }
1747         }
1748
1749         talloc_free(tmp_ctx);
1750 }
1751
1752
1753 /*
1754   we think we are winning the election - send a broadcast election request
1755  */
1756 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1757 {
1758         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1759         int ret;
1760
1761         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1762         if (ret != 0) {
1763                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1764         }
1765
1766         talloc_free(rec->send_election_te);
1767         rec->send_election_te = NULL;
1768 }
1769
1770 /*
1771   handler for memory dumps
1772 */
1773 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1774                              TDB_DATA data, void *private_data)
1775 {
1776         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1777         TDB_DATA *dump;
1778         int ret;
1779         struct rd_memdump_reply *rd;
1780
1781         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1782                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1783                 talloc_free(tmp_ctx);
1784                 return;
1785         }
1786         rd = (struct rd_memdump_reply *)data.dptr;
1787
1788         dump = talloc_zero(tmp_ctx, TDB_DATA);
1789         if (dump == NULL) {
1790                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1791                 talloc_free(tmp_ctx);
1792                 return;
1793         }
1794         ret = ctdb_dump_memory(ctdb, dump);
1795         if (ret != 0) {
1796                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1797                 talloc_free(tmp_ctx);
1798                 return;
1799         }
1800
1801 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1802
1803         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1804         if (ret != 0) {
1805                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1806                 talloc_free(tmp_ctx);
1807                 return;
1808         }
1809
1810         talloc_free(tmp_ctx);
1811 }
1812
1813 /*
1814   handler for reload_nodes
1815 */
1816 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1817                              TDB_DATA data, void *private_data)
1818 {
1819         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1820
1821         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1822
1823         reload_nodes_file(rec->ctdb);
1824 }
1825
1826 /*
1827   handler for ip reallocate, just add it to the list of callers and 
1828   handle this later in the monitor_cluster loop so we do not recurse
1829   with other callers to takeover_run()
1830 */
1831 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1832                              TDB_DATA data, void *private_data)
1833 {
1834         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1835         struct ip_reallocate_list *caller;
1836
1837         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1838                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1839                 return;
1840         }
1841
1842         if (rec->ip_reallocate_ctx == NULL) {
1843                 rec->ip_reallocate_ctx = talloc_new(rec);
1844                 CTDB_NO_MEMORY_FATAL(ctdb, caller);
1845         }
1846
1847         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1848         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1849
1850         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1851         caller->next = rec->reallocate_callers;
1852         rec->reallocate_callers = caller;
1853
1854         return;
1855 }
1856
1857 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1858 {
1859         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1860         TDB_DATA result;
1861         int32_t ret;
1862         struct ip_reallocate_list *callers;
1863
1864         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1865         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1866         result.dsize = sizeof(int32_t);
1867         result.dptr  = (uint8_t *)&ret;
1868
1869         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1870                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1871                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1872                 if (ret != 0) {
1873                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1874                 }
1875         }
1876
1877         talloc_free(tmp_ctx);
1878         talloc_free(rec->ip_reallocate_ctx);
1879         rec->ip_reallocate_ctx = NULL;
1880         rec->reallocate_callers = NULL;
1881         
1882 }
1883
1884
1885 /*
1886   handler for recovery master elections
1887 */
1888 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1889                              TDB_DATA data, void *private_data)
1890 {
1891         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1892         int ret;
1893         struct election_message *em = (struct election_message *)data.dptr;
1894         TALLOC_CTX *mem_ctx;
1895
1896         /* we got an election packet - update the timeout for the election */
1897         talloc_free(rec->election_timeout);
1898         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1899                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1900                                                 ctdb_election_timeout, rec);
1901
1902         mem_ctx = talloc_new(ctdb);
1903
1904         /* someone called an election. check their election data
1905            and if we disagree and we would rather be the elected node, 
1906            send a new election message to all other nodes
1907          */
1908         if (ctdb_election_win(rec, em)) {
1909                 if (!rec->send_election_te) {
1910                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1911                                                                 timeval_current_ofs(0, 500000),
1912                                                                 election_send_request, rec);
1913                 }
1914                 talloc_free(mem_ctx);
1915                 /*unban_all_nodes(ctdb);*/
1916                 return;
1917         }
1918         
1919         /* we didn't win */
1920         talloc_free(rec->send_election_te);
1921         rec->send_election_te = NULL;
1922
1923         if (ctdb->tunable.verify_recovery_lock != 0) {
1924                 /* release the recmaster lock */
1925                 if (em->pnn != ctdb->pnn &&
1926                     ctdb->recovery_lock_fd != -1) {
1927                         close(ctdb->recovery_lock_fd);
1928                         ctdb->recovery_lock_fd = -1;
1929                         unban_all_nodes(ctdb);
1930                 }
1931         }
1932
1933         /* ok, let that guy become recmaster then */
1934         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1935         if (ret != 0) {
1936                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1937                 talloc_free(mem_ctx);
1938                 return;
1939         }
1940
1941         /* release any bans */
1942         rec->last_culprit = (uint32_t)-1;
1943         talloc_free(rec->banned_nodes);
1944         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1945         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1946
1947         talloc_free(mem_ctx);
1948         return;
1949 }
1950
1951
1952 /*
1953   force the start of the election process
1954  */
1955 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1956                            struct ctdb_node_map *nodemap)
1957 {
1958         int ret;
1959         struct ctdb_context *ctdb = rec->ctdb;
1960
1961         /* set all nodes to recovery mode to stop all internode traffic */
1962         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1963         if (ret != 0) {
1964                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1965                 return;
1966         }
1967
1968         talloc_free(rec->election_timeout);
1969         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1970                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1971                                                 ctdb_election_timeout, rec);
1972
1973         ret = send_election_request(rec, pnn, true);
1974         if (ret!=0) {
1975                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1976                 return;
1977         }
1978
1979         /* wait for a few seconds to collect all responses */
1980         ctdb_wait_election(rec);
1981 }
1982
1983
1984
1985 /*
1986   handler for when a node changes its flags
1987 */
1988 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1989                             TDB_DATA data, void *private_data)
1990 {
1991         int ret;
1992         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1993         struct ctdb_node_map *nodemap=NULL;
1994         TALLOC_CTX *tmp_ctx;
1995         uint32_t changed_flags;
1996         int i;
1997         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1998
1999         if (data.dsize != sizeof(*c)) {
2000                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2001                 return;
2002         }
2003
2004         tmp_ctx = talloc_new(ctdb);
2005         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2006
2007         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2008         if (ret != 0) {
2009                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2010                 talloc_free(tmp_ctx);
2011                 return;         
2012         }
2013
2014
2015         for (i=0;i<nodemap->num;i++) {
2016                 if (nodemap->nodes[i].pnn == c->pnn) break;
2017         }
2018
2019         if (i == nodemap->num) {
2020                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2021                 talloc_free(tmp_ctx);
2022                 return;
2023         }
2024
2025         changed_flags = c->old_flags ^ c->new_flags;
2026
2027         if (nodemap->nodes[i].flags != c->new_flags) {
2028                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2029         }
2030
2031         nodemap->nodes[i].flags = c->new_flags;
2032
2033         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2034                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2035
2036         if (ret == 0) {
2037                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2038                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2039         }
2040         
2041         if (ret == 0 &&
2042             ctdb->recovery_master == ctdb->pnn &&
2043             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2044                 /* Only do the takeover run if the perm disabled or unhealthy
2045                    flags changed since these will cause an ip failover but not
2046                    a recovery.
2047                    If the node became disconnected or banned this will also
2048                    lead to an ip address failover but that is handled 
2049                    during recovery
2050                 */
2051                 if (changed_flags & NODE_FLAGS_DISABLED) {
2052                         rec->need_takeover_run = true;
2053                 }
2054         }
2055
2056         talloc_free(tmp_ctx);
2057 }
2058
2059 /*
2060   handler for when we need to push out flag changes ot all other nodes
2061 */
2062 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2063                             TDB_DATA data, void *private_data)
2064 {
2065         int ret;
2066         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2067
2068         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
2069         if (ret != 0) {
2070                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2071         }
2072 }
2073
2074
2075 struct verify_recmode_normal_data {
2076         uint32_t count;
2077         enum monitor_result status;
2078 };
2079
2080 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2081 {
2082         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2083
2084
2085         /* one more node has responded with recmode data*/
2086         rmdata->count--;
2087
2088         /* if we failed to get the recmode, then return an error and let
2089            the main loop try again.
2090         */
2091         if (state->state != CTDB_CONTROL_DONE) {
2092                 if (rmdata->status == MONITOR_OK) {
2093                         rmdata->status = MONITOR_FAILED;
2094                 }
2095                 return;
2096         }
2097
2098         /* if we got a response, then the recmode will be stored in the
2099            status field
2100         */
2101         if (state->status != CTDB_RECOVERY_NORMAL) {
2102                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2103                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2104         }
2105
2106         return;
2107 }
2108
2109
2110 /* verify that all nodes are in normal recovery mode */
2111 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2112 {
2113         struct verify_recmode_normal_data *rmdata;
2114         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2115         struct ctdb_client_control_state *state;
2116         enum monitor_result status;
2117         int j;
2118         
2119         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2120         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2121         rmdata->count  = 0;
2122         rmdata->status = MONITOR_OK;
2123
2124         /* loop over all active nodes and send an async getrecmode call to 
2125            them*/
2126         for (j=0; j<nodemap->num; j++) {
2127                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2128                         continue;
2129                 }
2130                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2131                                         CONTROL_TIMEOUT(), 
2132                                         nodemap->nodes[j].pnn);
2133                 if (state == NULL) {
2134                         /* we failed to send the control, treat this as 
2135                            an error and try again next iteration
2136                         */                      
2137                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2138                         talloc_free(mem_ctx);
2139                         return MONITOR_FAILED;
2140                 }
2141
2142                 /* set up the callback functions */
2143                 state->async.fn = verify_recmode_normal_callback;
2144                 state->async.private_data = rmdata;
2145
2146                 /* one more control to wait for to complete */
2147                 rmdata->count++;
2148         }
2149
2150
2151         /* now wait for up to the maximum number of seconds allowed
2152            or until all nodes we expect a response from has replied
2153         */
2154         while (rmdata->count > 0) {
2155                 event_loop_once(ctdb->ev);
2156         }
2157
2158         status = rmdata->status;
2159         talloc_free(mem_ctx);
2160         return status;
2161 }
2162
2163
2164 struct verify_recmaster_data {
2165         struct ctdb_recoverd *rec;
2166         uint32_t count;
2167         uint32_t pnn;
2168         enum monitor_result status;
2169 };
2170
2171 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2172 {
2173         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2174
2175
2176         /* one more node has responded with recmaster data*/
2177         rmdata->count--;
2178
2179         /* if we failed to get the recmaster, then return an error and let
2180            the main loop try again.
2181         */
2182         if (state->state != CTDB_CONTROL_DONE) {
2183                 if (rmdata->status == MONITOR_OK) {
2184                         rmdata->status = MONITOR_FAILED;
2185                 }
2186                 return;
2187         }
2188
2189         /* if we got a response, then the recmaster will be stored in the
2190            status field
2191         */
2192         if (state->status != rmdata->pnn) {
2193                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2194                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2195                 rmdata->status = MONITOR_ELECTION_NEEDED;
2196         }
2197
2198         return;
2199 }
2200
2201
2202 /* verify that all nodes agree that we are the recmaster */
2203 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2204 {
2205         struct ctdb_context *ctdb = rec->ctdb;
2206         struct verify_recmaster_data *rmdata;
2207         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2208         struct ctdb_client_control_state *state;
2209         enum monitor_result status;
2210         int j;
2211         
2212         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2213         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2214         rmdata->rec    = rec;
2215         rmdata->count  = 0;
2216         rmdata->pnn    = pnn;
2217         rmdata->status = MONITOR_OK;
2218
2219         /* loop over all active nodes and send an async getrecmaster call to 
2220            them*/
2221         for (j=0; j<nodemap->num; j++) {
2222                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2223                         continue;
2224                 }
2225                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2226                                         CONTROL_TIMEOUT(),
2227                                         nodemap->nodes[j].pnn);
2228                 if (state == NULL) {
2229                         /* we failed to send the control, treat this as 
2230                            an error and try again next iteration
2231                         */                      
2232                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2233                         talloc_free(mem_ctx);
2234                         return MONITOR_FAILED;
2235                 }
2236
2237                 /* set up the callback functions */
2238                 state->async.fn = verify_recmaster_callback;
2239                 state->async.private_data = rmdata;
2240
2241                 /* one more control to wait for to complete */
2242                 rmdata->count++;
2243         }
2244
2245
2246         /* now wait for up to the maximum number of seconds allowed
2247            or until all nodes we expect a response from has replied
2248         */
2249         while (rmdata->count > 0) {
2250                 event_loop_once(ctdb->ev);
2251         }
2252
2253         status = rmdata->status;
2254         talloc_free(mem_ctx);
2255         return status;
2256 }
2257
2258
2259 /* called to check that the allocation of public ip addresses is ok.
2260 */
2261 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2262 {
2263         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2264         struct ctdb_all_public_ips *ips = NULL;
2265         struct ctdb_uptime *uptime1 = NULL;
2266         struct ctdb_uptime *uptime2 = NULL;
2267         int ret, j;
2268
2269         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2270                                 CTDB_CURRENT_NODE, &uptime1);
2271         if (ret != 0) {
2272                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2273                 talloc_free(mem_ctx);
2274                 return -1;
2275         }
2276
2277         /* read the ip allocation from the local node */
2278         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2279         if (ret != 0) {
2280                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2281                 talloc_free(mem_ctx);
2282                 return -1;
2283         }
2284
2285         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2286                                 CTDB_CURRENT_NODE, &uptime2);
2287         if (ret != 0) {
2288                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2289                 talloc_free(mem_ctx);
2290                 return -1;
2291         }
2292
2293         /* skip the check if the startrecovery time has changed */
2294         if (timeval_compare(&uptime1->last_recovery_started,
2295                             &uptime2->last_recovery_started) != 0) {
2296                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2297                 talloc_free(mem_ctx);
2298                 return 0;
2299         }
2300
2301         /* skip the check if the endrecovery time has changed */
2302         if (timeval_compare(&uptime1->last_recovery_finished,
2303                             &uptime2->last_recovery_finished) != 0) {
2304                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2305                 talloc_free(mem_ctx);
2306                 return 0;
2307         }
2308
2309         /* skip the check if we have started but not finished recovery */
2310         if (timeval_compare(&uptime1->last_recovery_finished,
2311                             &uptime1->last_recovery_started) != 1) {
2312                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2313                 talloc_free(mem_ctx);
2314
2315                 return 0;
2316         }
2317
2318         /* verify that we have the ip addresses we should have
2319            and we dont have ones we shouldnt have.
2320            if we find an inconsistency we set recmode to
2321            active on the local node and wait for the recmaster
2322            to do a full blown recovery
2323         */
2324         for (j=0; j<ips->num; j++) {
2325                 if (ips->ips[j].pnn == pnn) {
2326                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2327                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2328                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2329                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2330                                 if (ret != 0) {
2331                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2332
2333                                         talloc_free(mem_ctx);
2334                                         return -1;
2335                                 }
2336                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2337                                 if (ret != 0) {
2338                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2339
2340                                         talloc_free(mem_ctx);
2341                                         return -1;
2342                                 }
2343                         }
2344                 } else {
2345                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2346                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2347                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2348
2349                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2350                                 if (ret != 0) {
2351                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2352
2353                                         talloc_free(mem_ctx);
2354                                         return -1;
2355                                 }
2356                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2357                                 if (ret != 0) {
2358                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2359
2360                                         talloc_free(mem_ctx);
2361                                         return -1;
2362                                 }
2363                         }
2364                 }
2365         }
2366
2367         talloc_free(mem_ctx);
2368         return 0;
2369 }
2370
2371
2372 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2373 {
2374         struct ctdb_node_map **remote_nodemaps = callback_data;
2375
2376         if (node_pnn >= ctdb->num_nodes) {
2377                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2378                 return;
2379         }
2380
2381         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2382
2383 }
2384
2385 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2386         struct ctdb_node_map *nodemap,
2387         struct ctdb_node_map **remote_nodemaps)
2388 {
2389         uint32_t *nodes;
2390
2391         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2392         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2393                                         nodes,
2394                                         CONTROL_TIMEOUT(), false, tdb_null,
2395                                         async_getnodemap_callback,
2396                                         NULL,
2397                                         remote_nodemaps) != 0) {
2398                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2399
2400                 return -1;
2401         }
2402
2403         return 0;
2404 }
2405
2406 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2407 struct ctdb_check_reclock_state {
2408         struct ctdb_context *ctdb;
2409         struct timeval start_time;
2410         int fd[2];
2411         pid_t child;
2412         struct timed_event *te;
2413         struct fd_event *fde;
2414         enum reclock_child_status status;
2415 };
2416
2417 /* when we free the reclock state we must kill any child process.
2418 */
2419 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2420 {
2421         struct ctdb_context *ctdb = state->ctdb;
2422
2423         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2424
2425         if (state->fd[0] != -1) {
2426                 close(state->fd[0]);
2427                 state->fd[0] = -1;
2428         }
2429         if (state->fd[1] != -1) {
2430                 close(state->fd[1]);
2431                 state->fd[1] = -1;
2432         }
2433         kill(state->child, SIGKILL);
2434         return 0;
2435 }
2436
2437 /*
2438   called if our check_reclock child times out. this would happen if
2439   i/o to the reclock file blocks.
2440  */
2441 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2442                                          struct timeval t, void *private_data)
2443 {
2444         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2445                                            struct ctdb_check_reclock_state);
2446
2447         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2448         state->status = RECLOCK_TIMEOUT;
2449 }
2450
2451 /* this is called when the child process has completed checking the reclock
2452    file and has written data back to us through the pipe.
2453 */
2454 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2455                              uint16_t flags, void *private_data)
2456 {
2457         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2458                                              struct ctdb_check_reclock_state);
2459         char c = 0;
2460         int ret;
2461
2462         /* we got a response from our child process so we can abort the
2463            timeout.
2464         */
2465         talloc_free(state->te);
2466         state->te = NULL;
2467
2468         ret = read(state->fd[0], &c, 1);
2469         if (ret != 1 || c != RECLOCK_OK) {
2470                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2471                 state->status = RECLOCK_FAILED;
2472
2473                 return;
2474         }
2475
2476         state->status = RECLOCK_OK;
2477         return;
2478 }
2479
2480 static int check_recovery_lock(struct ctdb_context *ctdb)
2481 {
2482         int ret;
2483         struct ctdb_check_reclock_state *state;
2484         pid_t parent = getpid();
2485
2486         if (ctdb->recovery_lock_fd == -1) {
2487                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2488                 return -1;
2489         }
2490
2491         state = talloc(ctdb, struct ctdb_check_reclock_state);
2492         CTDB_NO_MEMORY(ctdb, state);
2493
2494         state->ctdb = ctdb;
2495         state->start_time = timeval_current();
2496         state->status = RECLOCK_CHECKING;
2497         state->fd[0] = -1;
2498         state->fd[1] = -1;
2499
2500         ret = pipe(state->fd);
2501         if (ret != 0) {
2502                 talloc_free(state);
2503                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2504                 return -1;
2505         }
2506
2507         state->child = fork();
2508         if (state->child == (pid_t)-1) {
2509                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2510                 close(state->fd[0]);
2511                 state->fd[0] = -1;
2512                 close(state->fd[1]);
2513                 state->fd[1] = -1;
2514                 talloc_free(state);
2515                 return -1;
2516         }
2517
2518         if (state->child == 0) {
2519                 char cc = RECLOCK_OK;
2520                 close(state->fd[0]);
2521                 state->fd[0] = -1;
2522
2523                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2524                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2525                         cc = RECLOCK_FAILED;
2526                 }
2527
2528                 write(state->fd[1], &cc, 1);
2529                 /* make sure we die when our parent dies */
2530                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2531                         sleep(5);
2532                         write(state->fd[1], &cc, 1);
2533                 }
2534                 _exit(0);
2535         }
2536         close(state->fd[1]);
2537         state->fd[1] = -1;
2538
2539         talloc_set_destructor(state, check_reclock_destructor);
2540
2541         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2542                                     ctdb_check_reclock_timeout, state);
2543         if (state->te == NULL) {
2544                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2545                 talloc_free(state);
2546                 return -1;
2547         }
2548
2549         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2550                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2551                                 reclock_child_handler,
2552                                 (void *)state);
2553
2554         if (state->fde == NULL) {
2555                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2556                 talloc_free(state);
2557                 return -1;
2558         }
2559
2560         while (state->status == RECLOCK_CHECKING) {
2561                 event_loop_once(ctdb->ev);
2562         }
2563
2564         if (state->status == RECLOCK_FAILED) {
2565                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2566                 close(ctdb->recovery_lock_fd);
2567                 ctdb->recovery_lock_fd = -1;
2568                 talloc_free(state);
2569                 return -1;
2570         }
2571
2572         talloc_free(state);
2573         return 0;
2574 }
2575
2576 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2577 {
2578         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2579         const char *reclockfile;
2580
2581         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2582                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2583                 talloc_free(tmp_ctx);
2584                 return -1;      
2585         }
2586
2587         if (reclockfile == NULL) {
2588                 if (ctdb->recovery_lock_file != NULL) {
2589                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2590                         talloc_free(ctdb->recovery_lock_file);
2591                         ctdb->recovery_lock_file = NULL;
2592                         if (ctdb->recovery_lock_fd != -1) {
2593                                 close(ctdb->recovery_lock_fd);
2594                                 ctdb->recovery_lock_fd = -1;
2595                         }
2596                 }
2597                 ctdb->tunable.verify_recovery_lock = 0;
2598                 talloc_free(tmp_ctx);
2599                 return 0;
2600         }
2601
2602         if (ctdb->recovery_lock_file == NULL) {
2603                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2604                 if (ctdb->recovery_lock_fd != -1) {
2605                         close(ctdb->recovery_lock_fd);
2606                         ctdb->recovery_lock_fd = -1;
2607                 }
2608                 talloc_free(tmp_ctx);
2609                 return 0;
2610         }
2611
2612
2613         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2614                 talloc_free(tmp_ctx);
2615                 return 0;
2616         }
2617
2618         talloc_free(ctdb->recovery_lock_file);
2619         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2620         ctdb->tunable.verify_recovery_lock = 0;
2621         if (ctdb->recovery_lock_fd != -1) {
2622                 close(ctdb->recovery_lock_fd);
2623                 ctdb->recovery_lock_fd = -1;
2624         }
2625
2626         talloc_free(tmp_ctx);
2627         return 0;
2628 }
2629                 
2630 /*
2631   the main monitoring loop
2632  */
2633 static void monitor_cluster(struct ctdb_context *ctdb)
2634 {
2635         uint32_t pnn;
2636         TALLOC_CTX *mem_ctx=NULL;
2637         struct ctdb_node_map *nodemap=NULL;
2638         struct ctdb_node_map *recmaster_nodemap=NULL;
2639         struct ctdb_node_map **remote_nodemaps=NULL;
2640         struct ctdb_vnn_map *vnnmap=NULL;
2641         struct ctdb_vnn_map *remote_vnnmap=NULL;
2642         int32_t debug_level;
2643         int i, j, ret;
2644         struct ctdb_recoverd *rec;
2645
2646         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2647
2648         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2649         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2650
2651         rec->ctdb = ctdb;
2652         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2653         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2654
2655         rec->priority_time = timeval_current();
2656
2657         /* register a message port for sending memory dumps */
2658         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2659
2660         /* register a message port for recovery elections */
2661         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2662
2663         /* when nodes are disabled/enabled */
2664         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2665
2666         /* when we are asked to puch out a flag change */
2667         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2668
2669         /* when nodes are banned */
2670         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2671
2672         /* and one for when nodes are unbanned */
2673         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2674
2675         /* register a message port for vacuum fetch */
2676         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2677
2678         /* register a message port for reloadnodes  */
2679         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2680
2681         /* register a message port for performing a takeover run */
2682         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2683
2684 again:
2685         if (mem_ctx) {
2686                 talloc_free(mem_ctx);
2687                 mem_ctx = NULL;
2688         }
2689         mem_ctx = talloc_new(ctdb);
2690         if (!mem_ctx) {
2691                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2692                 exit(-1);
2693         }
2694
2695         /* we only check for recovery once every second */
2696         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2697
2698         /* verify that the main daemon is still running */
2699         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2700                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2701                 exit(-1);
2702         }
2703
2704         /* ping the local daemon to tell it we are alive */
2705         ctdb_ctrl_recd_ping(ctdb);
2706
2707         if (rec->election_timeout) {
2708                 /* an election is in progress */
2709                 goto again;
2710         }
2711
2712         /* read the debug level from the parent and update locally */
2713         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2714         if (ret !=0) {
2715                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2716                 goto again;
2717         }
2718         LogLevel = debug_level;
2719
2720
2721         /* We must check if we need to ban a node here but we want to do this
2722            as early as possible so we dont wait until we have pulled the node
2723            map from the local node. thats why we have the hardcoded value 20
2724         */
2725         if (rec->culprit_counter > 20) {
2726                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2727                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2728                          ctdb->tunable.recovery_ban_period));
2729                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2730         }
2731
2732         /* get relevant tunables */
2733         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2734         if (ret != 0) {
2735                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2736                 goto again;
2737         }
2738
2739         /* get the current recovery lock file from the server */
2740         if (update_recovery_lock_file(ctdb) != 0) {
2741                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2742                 goto again;
2743         }
2744
2745         /* Make sure that if recovery lock verification becomes disabled when
2746            we close the file
2747         */
2748         if (ctdb->tunable.verify_recovery_lock == 0) {
2749                 if (ctdb->recovery_lock_fd != -1) {
2750                         close(ctdb->recovery_lock_fd);
2751                         ctdb->recovery_lock_fd = -1;
2752                 }
2753         }
2754
2755         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2756         if (pnn == (uint32_t)-1) {
2757                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2758                 goto again;
2759         }
2760
2761         /* get the vnnmap */
2762         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2763         if (ret != 0) {
2764                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2765                 goto again;
2766         }
2767
2768
2769         /* get number of nodes */
2770         if (rec->nodemap) {
2771                 talloc_free(rec->nodemap);
2772                 rec->nodemap = NULL;
2773                 nodemap=NULL;
2774         }
2775         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2776         if (ret != 0) {
2777                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2778                 goto again;
2779         }
2780         nodemap = rec->nodemap;
2781
2782         /* check which node is the recovery master */
2783         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2784         if (ret != 0) {
2785                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2786                 goto again;
2787         }
2788
2789         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2790         if (rec->recmaster != pnn) {
2791                 if (rec->ip_reallocate_ctx != NULL) {
2792                         talloc_free(rec->ip_reallocate_ctx);
2793                         rec->ip_reallocate_ctx = NULL;
2794                         rec->reallocate_callers = NULL;
2795                 }
2796         }
2797         /* if there are takeovers requested, perform it and notify the waiters */
2798         if (rec->reallocate_callers) {
2799                 process_ipreallocate_requests(ctdb, rec);
2800         }
2801
2802         if (rec->recmaster == (uint32_t)-1) {
2803                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2804                 force_election(rec, pnn, nodemap);
2805                 goto again;
2806         }
2807
2808
2809         /* if the local daemon is STOPPED, we verify that the databases are
2810            also frozen and thet the recmode is set to active 
2811         */
2812         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2813                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2814                 if (ret != 0) {
2815                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2816                 }
2817                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2818                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2819
2820                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2821                         if (ret != 0) {
2822                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2823                                 goto again;
2824                         }
2825                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2826                         if (ret != 0) {
2827                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2828
2829                                 goto again;
2830                         }
2831                         goto again;
2832                 }
2833         }
2834
2835         
2836         /* check that we (recovery daemon) and the local ctdb daemon
2837            agrees on whether we are banned or not
2838         */
2839         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2840                 if (rec->banned_nodes[pnn] == NULL) {
2841                         if (rec->recmaster == pnn) {
2842                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2843
2844                                 ctdb_unban_node(rec, pnn);
2845                         } else {
2846                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2847                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2848                                 ctdb_set_culprit(rec, pnn);
2849                         }
2850                         goto again;
2851                 }
2852         } else {
2853                 if (rec->banned_nodes[pnn] != NULL) {
2854                         if (rec->recmaster == pnn) {
2855                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2856
2857                                 ctdb_unban_node(rec, pnn);
2858                         } else {
2859                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2860
2861                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2862                                 ctdb_set_culprit(rec, pnn);
2863                         }
2864                         goto again;
2865                 }
2866         }
2867
2868         /* remember our own node flags */
2869         rec->node_flags = nodemap->nodes[pnn].flags;
2870
2871         /* count how many active nodes there are */
2872         rec->num_active    = 0;
2873         rec->num_connected = 0;
2874         for (i=0; i<nodemap->num; i++) {
2875                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2876                         rec->num_active++;
2877                 }
2878                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2879                         rec->num_connected++;
2880                 }
2881         }
2882
2883
2884         /* verify that the recmaster node is still active */
2885         for (j=0; j<nodemap->num; j++) {
2886                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2887                         break;
2888                 }
2889         }
2890
2891         if (j == nodemap->num) {
2892                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2893                 force_election(rec, pnn, nodemap);
2894                 goto again;
2895         }
2896
2897         /* if recovery master is disconnected we must elect a new recmaster */
2898         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2899                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2900                 force_election(rec, pnn, nodemap);
2901                 goto again;
2902         }
2903
2904         /* grap the nodemap from the recovery master to check if it is banned */
2905         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2906                                    mem_ctx, &recmaster_nodemap);
2907         if (ret != 0) {
2908                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2909                           nodemap->nodes[j].pnn));
2910                 goto again;
2911         }
2912
2913
2914         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2915                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2916                 force_election(rec, pnn, nodemap);
2917                 goto again;
2918         }
2919
2920
2921         /* verify that we have all ip addresses we should have and we dont
2922          * have addresses we shouldnt have.
2923          */ 
2924         if (ctdb->do_checkpublicip) {
2925                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2926                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2927                         goto again;
2928                 }
2929         }
2930
2931
2932         /* if we are not the recmaster then we do not need to check
2933            if recovery is needed
2934          */
2935         if (pnn != rec->recmaster) {
2936                 goto again;
2937         }
2938
2939
2940         /* ensure our local copies of flags are right */
2941         ret = update_local_flags(rec, nodemap);
2942         if (ret == MONITOR_ELECTION_NEEDED) {
2943                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2944                 force_election(rec, pnn, nodemap);
2945                 goto again;
2946         }
2947         if (ret != MONITOR_OK) {
2948                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2949                 goto again;
2950         }
2951
2952         /* update the list of public ips that a node can handle for
2953            all connected nodes
2954         */
2955         if (ctdb->num_nodes != nodemap->num) {
2956                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2957                 reload_nodes_file(ctdb);
2958                 goto again;
2959         }
2960         for (j=0; j<nodemap->num; j++) {
2961                 /* release any existing data */
2962                 if (ctdb->nodes[j]->public_ips) {
2963                         talloc_free(ctdb->nodes[j]->public_ips);
2964                         ctdb->nodes[j]->public_ips = NULL;
2965                 }
2966
2967                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2968                         continue;
2969                 }
2970
2971                 /* grab a new shiny list of public ips from the node */
2972                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2973                         ctdb->nodes[j]->pnn, 
2974                         ctdb->nodes,
2975                         &ctdb->nodes[j]->public_ips)) {
2976                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2977                                 ctdb->nodes[j]->pnn));
2978                         goto again;
2979                 }
2980         }
2981
2982
2983         /* verify that all active nodes agree that we are the recmaster */
2984         switch (verify_recmaster(rec, nodemap, pnn)) {
2985         case MONITOR_RECOVERY_NEEDED:
2986                 /* can not happen */
2987                 goto again;
2988         case MONITOR_ELECTION_NEEDED:
2989                 force_election(rec, pnn, nodemap);
2990                 goto again;
2991         case MONITOR_OK:
2992                 break;
2993         case MONITOR_FAILED:
2994                 goto again;
2995         }
2996
2997
2998         if (rec->need_recovery) {
2999                 /* a previous recovery didn't finish */
3000                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
3001                 goto again;             
3002         }
3003
3004         /* verify that all active nodes are in normal mode 
3005            and not in recovery mode 
3006          */
3007         switch (verify_recmode(ctdb, nodemap)) {
3008         case MONITOR_RECOVERY_NEEDED:
3009                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
3010                 goto again;
3011         case MONITOR_FAILED:
3012                 goto again;
3013         case MONITOR_ELECTION_NEEDED:
3014                 /* can not happen */
3015         case MONITOR_OK:
3016                 break;
3017         }
3018
3019
3020         if (ctdb->tunable.verify_recovery_lock != 0) {
3021                 /* we should have the reclock - check its not stale */
3022                 ret = check_recovery_lock(ctdb);
3023                 if (ret != 0) {
3024                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3025                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
3026                         goto again;
3027                 }
3028         }
3029
3030         /* get the nodemap for all active remote nodes
3031          */
3032         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3033         if (remote_nodemaps == NULL) {
3034                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3035                 goto again;
3036         }
3037         for(i=0; i<nodemap->num; i++) {
3038                 remote_nodemaps[i] = NULL;
3039         }
3040         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3041                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3042                 goto again;
3043         } 
3044
3045         /* verify that all other nodes have the same nodemap as we have
3046         */
3047         for (j=0; j<nodemap->num; j++) {
3048                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3049                         continue;
3050                 }
3051
3052                 if (remote_nodemaps[j] == NULL) {
3053                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3054                         ctdb_set_culprit(rec, j);
3055
3056                         goto again;
3057                 }
3058
3059                 /* if the nodes disagree on how many nodes there are
3060                    then this is a good reason to try recovery
3061                  */
3062                 if (remote_nodemaps[j]->num != nodemap->num) {
3063                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3064                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3065                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3066                         goto again;
3067                 }
3068
3069                 /* if the nodes disagree on which nodes exist and are
3070                    active, then that is also a good reason to do recovery
3071                  */
3072                 for (i=0;i<nodemap->num;i++) {
3073                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3074                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3075                                           nodemap->nodes[j].pnn, i, 
3076                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3077                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3078                                             vnnmap, nodemap->nodes[j].pnn);
3079                                 goto again;
3080                         }
3081                 }
3082
3083                 /* verify the flags are consistent
3084                 */
3085                 for (i=0; i<nodemap->num; i++) {
3086                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3087                                 continue;
3088                         }
3089                         
3090                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3091                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3092                                   nodemap->nodes[j].pnn, 
3093                                   nodemap->nodes[i].pnn, 
3094                                   remote_nodemaps[j]->nodes[i].flags,
3095                                   nodemap->nodes[j].flags));
3096                                 if (i == j) {
3097                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3098                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3099                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3100                                                     vnnmap, nodemap->nodes[j].pnn);
3101                                         goto again;
3102                                 } else {
3103                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3104                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3105                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3106                                                     vnnmap, nodemap->nodes[j].pnn);
3107                                         goto again;
3108                                 }
3109                         }
3110                 }
3111         }
3112
3113
3114         /* there better be the same number of lmasters in the vnn map
3115            as there are active nodes or we will have to do a recovery
3116          */
3117         if (vnnmap->size != rec->num_active) {
3118                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3119                           vnnmap->size, rec->num_active));
3120                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
3121                 goto again;
3122         }
3123
3124         /* verify that all active nodes in the nodemap also exist in 
3125            the vnnmap.
3126          */
3127         for (j=0; j<nodemap->num; j++) {
3128                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3129                         continue;
3130                 }
3131                 if (nodemap->nodes[j].pnn == pnn) {
3132                         continue;
3133                 }
3134
3135                 for (i=0; i<vnnmap->size; i++) {
3136                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3137                                 break;
3138                         }
3139                 }
3140                 if (i == vnnmap->size) {
3141                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3142                                   nodemap->nodes[j].pnn));
3143                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3144                         goto again;
3145                 }
3146         }
3147
3148         
3149         /* verify that all other nodes have the same vnnmap
3150            and are from the same generation
3151          */
3152         for (j=0; j<nodemap->num; j++) {
3153                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3154                         continue;
3155                 }
3156                 if (nodemap->nodes[j].pnn == pnn) {
3157                         continue;
3158                 }
3159
3160                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3161                                           mem_ctx, &remote_vnnmap);
3162                 if (ret != 0) {
3163                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3164                                   nodemap->nodes[j].pnn));
3165                         goto again;
3166                 }
3167
3168                 /* verify the vnnmap generation is the same */
3169                 if (vnnmap->generation != remote_vnnmap->generation) {
3170                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3171                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3172                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3173                         goto again;
3174                 }
3175
3176                 /* verify the vnnmap size is the same */
3177                 if (vnnmap->size != remote_vnnmap->size) {
3178                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3179                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3180                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
3181                         goto again;
3182                 }
3183
3184                 /* verify the vnnmap is the same */
3185                 for (i=0;i<vnnmap->size;i++) {
3186                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3187                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3188                                           nodemap->nodes[j].pnn));
3189                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3190                                             vnnmap, nodemap->nodes[j].pnn);
3191                                 goto again;
3192                         }
3193                 }
3194         }
3195
3196         /* we might need to change who has what IP assigned */
3197         if (rec->need_takeover_run) {
3198                 rec->need_takeover_run = false;
3199
3200                 /* execute the "startrecovery" event script on all nodes */
3201                 ret = run_startrecovery_eventscript(rec, nodemap);
3202                 if (ret!=0) {
3203                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3204                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3205                                     vnnmap, ctdb->pnn);
3206                 }
3207
3208                 ret = ctdb_takeover_run(ctdb, nodemap);
3209                 if (ret != 0) {
3210                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3211                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3212                                     vnnmap, ctdb->pnn);
3213                 }
3214
3215                 /* execute the "recovered" event script on all nodes */
3216                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3217 #if 0
3218 // we cant check whether the event completed successfully
3219 // since this script WILL fail if the node is in recovery mode
3220 // and if that race happens, the code here would just cause a second
3221 // cascading recovery.
3222                 if (ret!=0) {
3223                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3224                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3225                                     vnnmap, ctdb->pnn);
3226                 }
3227 #endif
3228         }
3229
3230
3231         goto again;
3232
3233 }
3234
3235 /*
3236   event handler for when the main ctdbd dies
3237  */
3238 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3239                                  uint16_t flags, void *private_data)
3240 {
3241         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3242         _exit(1);
3243 }
3244
3245 /*
3246   called regularly to verify that the recovery daemon is still running
3247  */
3248 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3249                               struct timeval yt, void *p)
3250 {
3251         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3252
3253         if (kill(ctdb->recoverd_pid, 0) != 0) {
3254                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3255
3256                 ctdb_stop_recoverd(ctdb);
3257                 ctdb_stop_keepalive(ctdb);
3258                 ctdb_stop_monitoring(ctdb);
3259                 ctdb_release_all_ips(ctdb);
3260                 if (ctdb->methods != NULL) {
3261                         ctdb->methods->shutdown(ctdb);
3262                 }
3263                 ctdb_event_script(ctdb, "shutdown");
3264
3265                 exit(10);       
3266         }
3267
3268         event_add_timed(ctdb->ev, ctdb, 
3269                         timeval_current_ofs(30, 0),
3270                         ctdb_check_recd, ctdb);
3271 }
3272
3273 static void recd_sig_child_handler(struct event_context *ev,
3274         struct signal_event *se, int signum, int count,
3275         void *dont_care, 
3276         void *private_data)
3277 {
3278 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3279         int status;
3280         pid_t pid = -1;
3281
3282         while (pid != 0) {
3283                 pid = waitpid(-1, &status, WNOHANG);
3284                 if (pid == -1) {
3285                         if (errno != ECHILD) {
3286                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3287                         }
3288                         return;
3289                 }
3290                 if (pid > 0) {
3291                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3292                 }
3293         }
3294 }
3295
3296 /*
3297   startup the recovery daemon as a child of the main ctdb daemon
3298  */
3299 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3300 {
3301         int fd[2];
3302         struct signal_event *se;
3303
3304         if (pipe(fd) != 0) {
3305                 return -1;
3306         }
3307
3308         ctdb->ctdbd_pid = getpid();
3309
3310         ctdb->recoverd_pid = fork();
3311         if (ctdb->recoverd_pid == -1) {
3312                 return -1;
3313         }
3314         
3315         if (ctdb->recoverd_pid != 0) {
3316                 close(fd[0]);
3317                 event_add_timed(ctdb->ev, ctdb, 
3318                                 timeval_current_ofs(30, 0),
3319                                 ctdb_check_recd, ctdb);
3320                 return 0;
3321         }
3322
3323         close(fd[1]);
3324
3325         srandom(getpid() ^ time(NULL));
3326
3327         if (switch_from_server_to_client(ctdb) != 0) {
3328                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3329                 exit(1);
3330         }
3331
3332         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3333                      ctdb_recoverd_parent, &fd[0]);     
3334
3335         /* set up a handler to pick up sigchld */
3336         se = event_add_signal(ctdb->ev, ctdb,
3337                                      SIGCHLD, 0,
3338                                      recd_sig_child_handler,
3339                                      ctdb);
3340         if (se == NULL) {
3341                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3342                 exit(1);
3343         }
3344
3345         monitor_cluster(ctdb);
3346
3347         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3348         return -1;
3349 }
3350
3351 /*
3352   shutdown the recovery daemon
3353  */
3354 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3355 {
3356         if (ctdb->recoverd_pid == 0) {
3357                 return;
3358         }
3359
3360         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3361         kill(ctdb->recoverd_pid, SIGTERM);
3362 }