when we detect a ip-allocation mismatch, just force a new ip reassignment
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
258 {
259         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
260
261         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
262         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
263 }
264
265 /*
266   change recovery mode on all nodes
267  */
268 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
269 {
270         TDB_DATA data;
271         uint32_t *nodes;
272         TALLOC_CTX *tmp_ctx;
273
274         tmp_ctx = talloc_new(ctdb);
275         CTDB_NO_MEMORY(ctdb, tmp_ctx);
276
277         /* freeze all nodes */
278         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
279         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
280                 int i;
281
282                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
283                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
284                                                 nodes, i,
285                                                 CONTROL_TIMEOUT(),
286                                                 false, tdb_null,
287                                                 NULL,
288                                                 set_recmode_fail_callback,
289                                                 rec) != 0) {
290                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
291                                 talloc_free(tmp_ctx);
292                                 return -1;
293                         }
294                 }
295         }
296
297
298         data.dsize = sizeof(uint32_t);
299         data.dptr = (unsigned char *)&rec_mode;
300
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(),
304                                         false, data,
305                                         NULL, NULL,
306                                         NULL) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 /*
317   change recovery master on all node
318  */
319 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
320 {
321         TDB_DATA data;
322         TALLOC_CTX *tmp_ctx;
323         uint32_t *nodes;
324
325         tmp_ctx = talloc_new(ctdb);
326         CTDB_NO_MEMORY(ctdb, tmp_ctx);
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&pnn;
330
331         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(), false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /* update all remote nodes to use the same db priority that we have
347    this can fail if the remove node has not yet been upgraded to 
348    support this function, so we always return success and never fail
349    a recovery if this call fails.
350 */
351 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
352         struct ctdb_node_map *nodemap, 
353         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
354 {
355         int db;
356         uint32_t *nodes;
357
358         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
359
360         /* step through all local databases */
361         for (db=0; db<dbmap->num;db++) {
362                 TDB_DATA data;
363                 struct ctdb_db_priority db_prio;
364                 int ret;
365
366                 db_prio.db_id     = dbmap->dbs[db].dbid;
367                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
370                         continue;
371                 }
372
373                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
374
375                 data.dptr  = (uint8_t *)&db_prio;
376                 data.dsize = sizeof(db_prio);
377
378                 if (ctdb_client_async_control(ctdb,
379                                         CTDB_CONTROL_SET_DB_PRIORITY,
380                                         nodes, 0,
381                                         CONTROL_TIMEOUT(), false, data,
382                                         NULL, NULL,
383                                         NULL) != 0) {
384                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
385                 }
386         }
387
388         return 0;
389 }                       
390
391 /*
392   ensure all other nodes have attached to any databases that we have
393  */
394 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
395                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
396 {
397         int i, j, db, ret;
398         struct ctdb_dbid_map *remote_dbmap;
399
400         /* verify that all other nodes have all our databases */
401         for (j=0; j<nodemap->num; j++) {
402                 /* we dont need to ourself ourselves */
403                 if (nodemap->nodes[j].pnn == pnn) {
404                         continue;
405                 }
406                 /* dont check nodes that are unavailable */
407                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
408                         continue;
409                 }
410
411                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
412                                          mem_ctx, &remote_dbmap);
413                 if (ret != 0) {
414                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
415                         return -1;
416                 }
417
418                 /* step through all local databases */
419                 for (db=0; db<dbmap->num;db++) {
420                         const char *name;
421
422
423                         for (i=0;i<remote_dbmap->num;i++) {
424                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
425                                         break;
426                                 }
427                         }
428                         /* the remote node already have this database */
429                         if (i!=remote_dbmap->num) {
430                                 continue;
431                         }
432                         /* ok so we need to create this database */
433                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
434                                             mem_ctx, &name);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
437                                 return -1;
438                         }
439                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                            mem_ctx, name, dbmap->dbs[db].persistent);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   ensure we are attached to any databases that anyone else is attached to
454  */
455 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
456                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
457 {
458         int i, j, db, ret;
459         struct ctdb_dbid_map *remote_dbmap;
460
461         /* verify that we have all database any other node has */
462         for (j=0; j<nodemap->num; j++) {
463                 /* we dont need to ourself ourselves */
464                 if (nodemap->nodes[j].pnn == pnn) {
465                         continue;
466                 }
467                 /* dont check nodes that are unavailable */
468                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
469                         continue;
470                 }
471
472                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
473                                          mem_ctx, &remote_dbmap);
474                 if (ret != 0) {
475                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
476                         return -1;
477                 }
478
479                 /* step through all databases on the remote node */
480                 for (db=0; db<remote_dbmap->num;db++) {
481                         const char *name;
482
483                         for (i=0;i<(*dbmap)->num;i++) {
484                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
485                                         break;
486                                 }
487                         }
488                         /* we already have this db locally */
489                         if (i!=(*dbmap)->num) {
490                                 continue;
491                         }
492                         /* ok so we need to create this database and
493                            rebuild dbmap
494                          */
495                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
496                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
497                         if (ret != 0) {
498                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
499                                           nodemap->nodes[j].pnn));
500                                 return -1;
501                         }
502                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
503                                            remote_dbmap->dbs[db].persistent);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
506                                 return -1;
507                         }
508                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
511                                 return -1;
512                         }
513                 }
514         }
515
516         return 0;
517 }
518
519
520 /*
521   pull the remote database contents from one node into the recdb
522  */
523 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
524                                     struct tdb_wrap *recdb, uint32_t dbid)
525 {
526         int ret;
527         TDB_DATA outdata;
528         struct ctdb_marshall_buffer *reply;
529         struct ctdb_rec_data *rec;
530         int i;
531         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
532
533         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
534                                CONTROL_TIMEOUT(), &outdata);
535         if (ret != 0) {
536                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
537                 talloc_free(tmp_ctx);
538                 return -1;
539         }
540
541         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
542
543         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
544                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
545                 talloc_free(tmp_ctx);
546                 return -1;
547         }
548         
549         rec = (struct ctdb_rec_data *)&reply->data[0];
550         
551         for (i=0;
552              i<reply->count;
553              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
554                 TDB_DATA key, data;
555                 struct ctdb_ltdb_header *hdr;
556                 TDB_DATA existing;
557                 
558                 key.dptr = &rec->data[0];
559                 key.dsize = rec->keylen;
560                 data.dptr = &rec->data[key.dsize];
561                 data.dsize = rec->datalen;
562                 
563                 hdr = (struct ctdb_ltdb_header *)data.dptr;
564
565                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
566                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
567                         talloc_free(tmp_ctx);
568                         return -1;
569                 }
570
571                 /* fetch the existing record, if any */
572                 existing = tdb_fetch(recdb->tdb, key);
573                 
574                 if (existing.dptr != NULL) {
575                         struct ctdb_ltdb_header header;
576                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
577                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
578                                          (unsigned)existing.dsize, srcnode));
579                                 free(existing.dptr);
580                                 talloc_free(tmp_ctx);
581                                 return -1;
582                         }
583                         header = *(struct ctdb_ltdb_header *)existing.dptr;
584                         free(existing.dptr);
585                         if (!(header.rsn < hdr->rsn ||
586                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
587                                 continue;
588                         }
589                 }
590                 
591                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
592                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
593                         talloc_free(tmp_ctx);
594                         return -1;                              
595                 }
596         }
597
598         talloc_free(tmp_ctx);
599
600         return 0;
601 }
602
603 /*
604   pull all the remote database contents into the recdb
605  */
606 static int pull_remote_database(struct ctdb_context *ctdb,
607                                 struct ctdb_recoverd *rec, 
608                                 struct ctdb_node_map *nodemap, 
609                                 struct tdb_wrap *recdb, uint32_t dbid)
610 {
611         int j;
612
613         /* pull all records from all other nodes across onto this node
614            (this merges based on rsn)
615         */
616         for (j=0; j<nodemap->num; j++) {
617                 /* dont merge from nodes that are unavailable */
618                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
619                         continue;
620                 }
621                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
622                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
623                                  nodemap->nodes[j].pnn));
624                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
625                         return -1;
626                 }
627         }
628         
629         return 0;
630 }
631
632
633 /*
634   update flags on all active nodes
635  */
636 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
637 {
638         int ret;
639
640         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
641                 if (ret != 0) {
642                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
643                 return -1;
644         }
645
646         return 0;
647 }
648
649 /*
650   ensure all nodes have the same vnnmap we do
651  */
652 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
653                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
654 {
655         int j, ret;
656
657         /* push the new vnn map out to all the nodes */
658         for (j=0; j<nodemap->num; j++) {
659                 /* dont push to nodes that are unavailable */
660                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
661                         continue;
662                 }
663
664                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
665                 if (ret != 0) {
666                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
667                         return -1;
668                 }
669         }
670
671         return 0;
672 }
673
674
675 struct vacuum_info {
676         struct vacuum_info *next, *prev;
677         struct ctdb_recoverd *rec;
678         uint32_t srcnode;
679         struct ctdb_db_context *ctdb_db;
680         struct ctdb_marshall_buffer *recs;
681         struct ctdb_rec_data *r;
682 };
683
684 static void vacuum_fetch_next(struct vacuum_info *v);
685
686 /*
687   called when a vacuum fetch has completed - just free it and do the next one
688  */
689 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
690 {
691         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
692         talloc_free(state);
693         vacuum_fetch_next(v);
694 }
695
696
697 /*
698   process the next element from the vacuum list
699 */
700 static void vacuum_fetch_next(struct vacuum_info *v)
701 {
702         struct ctdb_call call;
703         struct ctdb_rec_data *r;
704
705         while (v->recs->count) {
706                 struct ctdb_client_call_state *state;
707                 TDB_DATA data;
708                 struct ctdb_ltdb_header *hdr;
709
710                 ZERO_STRUCT(call);
711                 call.call_id = CTDB_NULL_FUNC;
712                 call.flags = CTDB_IMMEDIATE_MIGRATION;
713
714                 r = v->r;
715                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
716                 v->recs->count--;
717
718                 call.key.dptr = &r->data[0];
719                 call.key.dsize = r->keylen;
720
721                 /* ensure we don't block this daemon - just skip a record if we can't get
722                    the chainlock */
723                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
724                         continue;
725                 }
726
727                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
728                 if (data.dptr == NULL) {
729                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
730                         continue;
731                 }
732
733                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
734                         free(data.dptr);
735                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
736                         continue;
737                 }
738                 
739                 hdr = (struct ctdb_ltdb_header *)data.dptr;
740                 if (hdr->dmaster == v->rec->ctdb->pnn) {
741                         /* its already local */
742                         free(data.dptr);
743                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
744                         continue;
745                 }
746
747                 free(data.dptr);
748
749                 state = ctdb_call_send(v->ctdb_db, &call);
750                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
751                 if (state == NULL) {
752                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
753                         talloc_free(v);
754                         return;
755                 }
756                 state->async.fn = vacuum_fetch_callback;
757                 state->async.private_data = v;
758                 return;
759         }
760
761         talloc_free(v);
762 }
763
764
765 /*
766   destroy a vacuum info structure
767  */
768 static int vacuum_info_destructor(struct vacuum_info *v)
769 {
770         DLIST_REMOVE(v->rec->vacuum_info, v);
771         return 0;
772 }
773
774
775 /*
776   handler for vacuum fetch
777 */
778 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
779                                  TDB_DATA data, void *private_data)
780 {
781         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
782         struct ctdb_marshall_buffer *recs;
783         int ret, i;
784         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
785         const char *name;
786         struct ctdb_dbid_map *dbmap=NULL;
787         bool persistent = false;
788         struct ctdb_db_context *ctdb_db;
789         struct ctdb_rec_data *r;
790         uint32_t srcnode;
791         struct vacuum_info *v;
792
793         recs = (struct ctdb_marshall_buffer *)data.dptr;
794         r = (struct ctdb_rec_data *)&recs->data[0];
795
796         if (recs->count == 0) {
797                 talloc_free(tmp_ctx);
798                 return;
799         }
800
801         srcnode = r->reqid;
802
803         for (v=rec->vacuum_info;v;v=v->next) {
804                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
805                         /* we're already working on records from this node */
806                         talloc_free(tmp_ctx);
807                         return;
808                 }
809         }
810
811         /* work out if the database is persistent */
812         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
813         if (ret != 0) {
814                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
815                 talloc_free(tmp_ctx);
816                 return;
817         }
818
819         for (i=0;i<dbmap->num;i++) {
820                 if (dbmap->dbs[i].dbid == recs->db_id) {
821                         persistent = dbmap->dbs[i].persistent;
822                         break;
823                 }
824         }
825         if (i == dbmap->num) {
826                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
827                 talloc_free(tmp_ctx);
828                 return;         
829         }
830
831         /* find the name of this database */
832         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
833                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
834                 talloc_free(tmp_ctx);
835                 return;
836         }
837
838         /* attach to it */
839         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
840         if (ctdb_db == NULL) {
841                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
842                 talloc_free(tmp_ctx);
843                 return;
844         }
845
846         v = talloc_zero(rec, struct vacuum_info);
847         if (v == NULL) {
848                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
849                 talloc_free(tmp_ctx);
850                 return;
851         }
852
853         v->rec = rec;
854         v->srcnode = srcnode;
855         v->ctdb_db = ctdb_db;
856         v->recs = talloc_memdup(v, recs, data.dsize);
857         if (v->recs == NULL) {
858                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
859                 talloc_free(v);
860                 talloc_free(tmp_ctx);
861                 return;         
862         }
863         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
864
865         DLIST_ADD(rec->vacuum_info, v);
866
867         talloc_set_destructor(v, vacuum_info_destructor);
868
869         vacuum_fetch_next(v);
870         talloc_free(tmp_ctx);
871 }
872
873
874 /*
875   called when ctdb_wait_timeout should finish
876  */
877 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
878                               struct timeval yt, void *p)
879 {
880         uint32_t *timed_out = (uint32_t *)p;
881         (*timed_out) = 1;
882 }
883
884 /*
885   wait for a given number of seconds
886  */
887 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
888 {
889         uint32_t timed_out = 0;
890         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
891         while (!timed_out) {
892                 event_loop_once(ctdb->ev);
893         }
894 }
895
896 /*
897   called when an election times out (ends)
898  */
899 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
900                                   struct timeval t, void *p)
901 {
902         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
903         rec->election_timeout = NULL;
904
905         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
906 }
907
908
909 /*
910   wait for an election to finish. It finished election_timeout seconds after
911   the last election packet is received
912  */
913 static void ctdb_wait_election(struct ctdb_recoverd *rec)
914 {
915         struct ctdb_context *ctdb = rec->ctdb;
916         while (rec->election_timeout) {
917                 event_loop_once(ctdb->ev);
918         }
919 }
920
921 /*
922   Update our local flags from all remote connected nodes. 
923   This is only run when we are or we belive we are the recovery master
924  */
925 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
926 {
927         int j;
928         struct ctdb_context *ctdb = rec->ctdb;
929         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
930
931         /* get the nodemap for all active remote nodes and verify
932            they are the same as for this node
933          */
934         for (j=0; j<nodemap->num; j++) {
935                 struct ctdb_node_map *remote_nodemap=NULL;
936                 int ret;
937
938                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
939                         continue;
940                 }
941                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
942                         continue;
943                 }
944
945                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
946                                            mem_ctx, &remote_nodemap);
947                 if (ret != 0) {
948                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
949                                   nodemap->nodes[j].pnn));
950                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
951                         talloc_free(mem_ctx);
952                         return MONITOR_FAILED;
953                 }
954                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
955                         /* We should tell our daemon about this so it
956                            updates its flags or else we will log the same 
957                            message again in the next iteration of recovery.
958                            Since we are the recovery master we can just as
959                            well update the flags on all nodes.
960                         */
961                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
962                         if (ret != 0) {
963                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
964                                 return -1;
965                         }
966
967                         /* Update our local copy of the flags in the recovery
968                            daemon.
969                         */
970                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
971                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
972                                  nodemap->nodes[j].flags));
973                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
974                 }
975                 talloc_free(remote_nodemap);
976         }
977         talloc_free(mem_ctx);
978         return MONITOR_OK;
979 }
980
981
982 /* Create a new random generation ip. 
983    The generation id can not be the INVALID_GENERATION id
984 */
985 static uint32_t new_generation(void)
986 {
987         uint32_t generation;
988
989         while (1) {
990                 generation = random();
991
992                 if (generation != INVALID_GENERATION) {
993                         break;
994                 }
995         }
996
997         return generation;
998 }
999
1000
1001 /*
1002   create a temporary working database
1003  */
1004 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1005 {
1006         char *name;
1007         struct tdb_wrap *recdb;
1008         unsigned tdb_flags;
1009
1010         /* open up the temporary recovery database */
1011         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1012         if (name == NULL) {
1013                 return NULL;
1014         }
1015         unlink(name);
1016
1017         tdb_flags = TDB_NOLOCK;
1018         if (!ctdb->do_setsched) {
1019                 tdb_flags |= TDB_NOMMAP;
1020         }
1021
1022         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1023                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1024         if (recdb == NULL) {
1025                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1026         }
1027
1028         talloc_free(name);
1029
1030         return recdb;
1031 }
1032
1033
1034 /* 
1035    a traverse function for pulling all relevent records from recdb
1036  */
1037 struct recdb_data {
1038         struct ctdb_context *ctdb;
1039         struct ctdb_marshall_buffer *recdata;
1040         uint32_t len;
1041         bool failed;
1042 };
1043
1044 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1045 {
1046         struct recdb_data *params = (struct recdb_data *)p;
1047         struct ctdb_rec_data *rec;
1048         struct ctdb_ltdb_header *hdr;
1049
1050         /* skip empty records */
1051         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1052                 return 0;
1053         }
1054
1055         /* update the dmaster field to point to us */
1056         hdr = (struct ctdb_ltdb_header *)data.dptr;
1057         hdr->dmaster = params->ctdb->pnn;
1058
1059         /* add the record to the blob ready to send to the nodes */
1060         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1061         if (rec == NULL) {
1062                 params->failed = true;
1063                 return -1;
1064         }
1065         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1066         if (params->recdata == NULL) {
1067                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1068                          rec->length + params->len, params->recdata->count));
1069                 params->failed = true;
1070                 return -1;
1071         }
1072         params->recdata->count++;
1073         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1074         params->len += rec->length;
1075         talloc_free(rec);
1076
1077         return 0;
1078 }
1079
1080 /*
1081   push the recdb database out to all nodes
1082  */
1083 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1084                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1085 {
1086         struct recdb_data params;
1087         struct ctdb_marshall_buffer *recdata;
1088         TDB_DATA outdata;
1089         TALLOC_CTX *tmp_ctx;
1090         uint32_t *nodes;
1091
1092         tmp_ctx = talloc_new(ctdb);
1093         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1094
1095         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1096         CTDB_NO_MEMORY(ctdb, recdata);
1097
1098         recdata->db_id = dbid;
1099
1100         params.ctdb = ctdb;
1101         params.recdata = recdata;
1102         params.len = offsetof(struct ctdb_marshall_buffer, data);
1103         params.failed = false;
1104
1105         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1106                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1107                 talloc_free(params.recdata);
1108                 talloc_free(tmp_ctx);
1109                 return -1;
1110         }
1111
1112         if (params.failed) {
1113                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1114                 talloc_free(params.recdata);
1115                 talloc_free(tmp_ctx);
1116                 return -1;              
1117         }
1118
1119         recdata = params.recdata;
1120
1121         outdata.dptr = (void *)recdata;
1122         outdata.dsize = params.len;
1123
1124         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1125         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1126                                         nodes, 0,
1127                                         CONTROL_TIMEOUT(), false, outdata,
1128                                         NULL, NULL,
1129                                         NULL) != 0) {
1130                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1131                 talloc_free(recdata);
1132                 talloc_free(tmp_ctx);
1133                 return -1;
1134         }
1135
1136         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1137                   dbid, recdata->count));
1138
1139         talloc_free(recdata);
1140         talloc_free(tmp_ctx);
1141
1142         return 0;
1143 }
1144
1145
1146 /*
1147   go through a full recovery on one database 
1148  */
1149 static int recover_database(struct ctdb_recoverd *rec, 
1150                             TALLOC_CTX *mem_ctx,
1151                             uint32_t dbid,
1152                             uint32_t pnn, 
1153                             struct ctdb_node_map *nodemap,
1154                             uint32_t transaction_id)
1155 {
1156         struct tdb_wrap *recdb;
1157         int ret;
1158         struct ctdb_context *ctdb = rec->ctdb;
1159         TDB_DATA data;
1160         struct ctdb_control_wipe_database w;
1161         uint32_t *nodes;
1162
1163         recdb = create_recdb(ctdb, mem_ctx);
1164         if (recdb == NULL) {
1165                 return -1;
1166         }
1167
1168         /* pull all remote databases onto the recdb */
1169         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1170         if (ret != 0) {
1171                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1172                 return -1;
1173         }
1174
1175         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1176
1177         /* wipe all the remote databases. This is safe as we are in a transaction */
1178         w.db_id = dbid;
1179         w.transaction_id = transaction_id;
1180
1181         data.dptr = (void *)&w;
1182         data.dsize = sizeof(w);
1183
1184         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1185         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1186                                         nodes, 0,
1187                                         CONTROL_TIMEOUT(), false, data,
1188                                         NULL, NULL,
1189                                         NULL) != 0) {
1190                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1191                 talloc_free(recdb);
1192                 return -1;
1193         }
1194         
1195         /* push out the correct database. This sets the dmaster and skips 
1196            the empty records */
1197         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1198         if (ret != 0) {
1199                 talloc_free(recdb);
1200                 return -1;
1201         }
1202
1203         /* all done with this database */
1204         talloc_free(recdb);
1205
1206         return 0;
1207 }
1208
1209 /*
1210   reload the nodes file 
1211 */
1212 static void reload_nodes_file(struct ctdb_context *ctdb)
1213 {
1214         ctdb->nodes = NULL;
1215         ctdb_load_nodes_file(ctdb);
1216 }
1217
1218         
1219 /*
1220   we are the recmaster, and recovery is needed - start a recovery run
1221  */
1222 static int do_recovery(struct ctdb_recoverd *rec, 
1223                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1224                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1225 {
1226         struct ctdb_context *ctdb = rec->ctdb;
1227         int i, j, ret;
1228         uint32_t generation;
1229         struct ctdb_dbid_map *dbmap;
1230         TDB_DATA data;
1231         uint32_t *nodes;
1232         struct timeval start_time;
1233
1234         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1235
1236         /* if recovery fails, force it again */
1237         rec->need_recovery = true;
1238
1239         for (i=0; i<ctdb->num_nodes; i++) {
1240                 struct ctdb_banning_state *ban_state;
1241
1242                 if (ctdb->nodes[i]->ban_state == NULL) {
1243                         continue;
1244                 }
1245                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1246                 if (ban_state->count < 2*ctdb->num_nodes) {
1247                         continue;
1248                 }
1249                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1250                         ctdb->nodes[i]->pnn, ban_state->count,
1251                         ctdb->tunable.recovery_ban_period));
1252                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1253                 ban_state->count = 0;
1254         }
1255
1256
1257         if (ctdb->tunable.verify_recovery_lock != 0) {
1258                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1259                 start_time = timeval_current();
1260                 if (!ctdb_recovery_lock(ctdb, true)) {
1261                         ctdb_set_culprit(rec, pnn);
1262                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1263                         return -1;
1264                 }
1265                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1266                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1267         }
1268
1269         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1270
1271         /* get a list of all databases */
1272         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1273         if (ret != 0) {
1274                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1275                 return -1;
1276         }
1277
1278         /* we do the db creation before we set the recovery mode, so the freeze happens
1279            on all databases we will be dealing with. */
1280
1281         /* verify that we have all the databases any other node has */
1282         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1283         if (ret != 0) {
1284                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1285                 return -1;
1286         }
1287
1288         /* verify that all other nodes have all our databases */
1289         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1290         if (ret != 0) {
1291                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1292                 return -1;
1293         }
1294         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1295
1296         /* update the database priority for all remote databases */
1297         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1298         if (ret != 0) {
1299                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1300         }
1301         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1302
1303
1304         /* set recovery mode to active on all nodes */
1305         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1306         if (ret != 0) {
1307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1308                 return -1;
1309         }
1310
1311         /* execute the "startrecovery" event script on all nodes */
1312         ret = run_startrecovery_eventscript(rec, nodemap);
1313         if (ret!=0) {
1314                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1315                 return -1;
1316         }
1317
1318         /* pick a new generation number */
1319         generation = new_generation();
1320
1321         /* change the vnnmap on this node to use the new generation 
1322            number but not on any other nodes.
1323            this guarantees that if we abort the recovery prematurely
1324            for some reason (a node stops responding?)
1325            that we can just return immediately and we will reenter
1326            recovery shortly again.
1327            I.e. we deliberately leave the cluster with an inconsistent
1328            generation id to allow us to abort recovery at any stage and
1329            just restart it from scratch.
1330          */
1331         vnnmap->generation = generation;
1332         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1333         if (ret != 0) {
1334                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1335                 return -1;
1336         }
1337
1338         data.dptr = (void *)&generation;
1339         data.dsize = sizeof(uint32_t);
1340
1341         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1342         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1343                                         nodes, 0,
1344                                         CONTROL_TIMEOUT(), false, data,
1345                                         NULL,
1346                                         transaction_start_fail_callback,
1347                                         rec) != 0) {
1348                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1349                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1350                                         nodes, 0,
1351                                         CONTROL_TIMEOUT(), false, tdb_null,
1352                                         NULL,
1353                                         NULL,
1354                                         NULL) != 0) {
1355                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1356                 }
1357                 return -1;
1358         }
1359
1360         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1361
1362         for (i=0;i<dbmap->num;i++) {
1363                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1364                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1365                         return -1;
1366                 }
1367         }
1368
1369         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1370
1371         /* commit all the changes */
1372         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1373                                         nodes, 0,
1374                                         CONTROL_TIMEOUT(), false, data,
1375                                         NULL, NULL,
1376                                         NULL) != 0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1378                 return -1;
1379         }
1380
1381         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1382         
1383
1384         /* update the capabilities for all nodes */
1385         ret = update_capabilities(ctdb, nodemap);
1386         if (ret!=0) {
1387                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1388                 return -1;
1389         }
1390
1391         /* build a new vnn map with all the currently active and
1392            unbanned nodes */
1393         generation = new_generation();
1394         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1395         CTDB_NO_MEMORY(ctdb, vnnmap);
1396         vnnmap->generation = generation;
1397         vnnmap->size = 0;
1398         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1399         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1400         for (i=j=0;i<nodemap->num;i++) {
1401                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1402                         continue;
1403                 }
1404                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1405                         /* this node can not be an lmaster */
1406                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1407                         continue;
1408                 }
1409
1410                 vnnmap->size++;
1411                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1412                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1413                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1414
1415         }
1416         if (vnnmap->size == 0) {
1417                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1418                 vnnmap->size++;
1419                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1420                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1421                 vnnmap->map[0] = pnn;
1422         }       
1423
1424         /* update to the new vnnmap on all nodes */
1425         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1426         if (ret != 0) {
1427                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1428                 return -1;
1429         }
1430
1431         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1432
1433         /* update recmaster to point to us for all nodes */
1434         ret = set_recovery_master(ctdb, nodemap, pnn);
1435         if (ret!=0) {
1436                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1437                 return -1;
1438         }
1439
1440         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1441
1442         /*
1443           update all nodes to have the same flags that we have
1444          */
1445         for (i=0;i<nodemap->num;i++) {
1446                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1447                         continue;
1448                 }
1449
1450                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1451                 if (ret != 0) {
1452                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1453                         return -1;
1454                 }
1455         }
1456
1457         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1458
1459         /* disable recovery mode */
1460         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1461         if (ret != 0) {
1462                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1463                 return -1;
1464         }
1465
1466         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1467
1468         /*
1469           tell nodes to takeover their public IPs
1470          */
1471         rec->need_takeover_run = false;
1472         ret = ctdb_takeover_run(ctdb, nodemap);
1473         if (ret != 0) {
1474                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1475                 return -1;
1476         }
1477         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1478
1479         /* execute the "recovered" event script on all nodes */
1480         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1481         if (ret!=0) {
1482                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1483                 return -1;
1484         }
1485
1486         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1487
1488         /* send a message to all clients telling them that the cluster 
1489            has been reconfigured */
1490         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1491
1492         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1493
1494         rec->need_recovery = false;
1495
1496         /* we managed to complete a full recovery, make sure to forgive
1497            any past sins by the nodes that could now participate in the
1498            recovery.
1499         */
1500         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1501         for (i=0;i<nodemap->num;i++) {
1502                 struct ctdb_banning_state *ban_state;
1503
1504                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1505                         continue;
1506                 }
1507
1508                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1509                 if (ban_state == NULL) {
1510                         continue;
1511                 }
1512
1513                 ban_state->count = 0;
1514         }
1515
1516
1517         /* We just finished a recovery successfully. 
1518            We now wait for rerecovery_timeout before we allow 
1519            another recovery to take place.
1520         */
1521         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1522         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1523         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1524
1525         return 0;
1526 }
1527
1528
1529 /*
1530   elections are won by first checking the number of connected nodes, then
1531   the priority time, then the pnn
1532  */
1533 struct election_message {
1534         uint32_t num_connected;
1535         struct timeval priority_time;
1536         uint32_t pnn;
1537         uint32_t node_flags;
1538 };
1539
1540 /*
1541   form this nodes election data
1542  */
1543 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1544 {
1545         int ret, i;
1546         struct ctdb_node_map *nodemap;
1547         struct ctdb_context *ctdb = rec->ctdb;
1548
1549         ZERO_STRUCTP(em);
1550
1551         em->pnn = rec->ctdb->pnn;
1552         em->priority_time = rec->priority_time;
1553
1554         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1555         if (ret != 0) {
1556                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1557                 return;
1558         }
1559
1560         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1561         em->node_flags = rec->node_flags;
1562
1563         for (i=0;i<nodemap->num;i++) {
1564                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1565                         em->num_connected++;
1566                 }
1567         }
1568
1569         /* we shouldnt try to win this election if we cant be a recmaster */
1570         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1571                 em->num_connected = 0;
1572                 em->priority_time = timeval_current();
1573         }
1574
1575         talloc_free(nodemap);
1576 }
1577
1578 /*
1579   see if the given election data wins
1580  */
1581 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1582 {
1583         struct election_message myem;
1584         int cmp = 0;
1585
1586         ctdb_election_data(rec, &myem);
1587
1588         /* we cant win if we dont have the recmaster capability */
1589         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1590                 return false;
1591         }
1592
1593         /* we cant win if we are banned */
1594         if (rec->node_flags & NODE_FLAGS_BANNED) {
1595                 return false;
1596         }       
1597
1598         /* we cant win if we are stopped */
1599         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1600                 return false;
1601         }       
1602
1603         /* we will automatically win if the other node is banned */
1604         if (em->node_flags & NODE_FLAGS_BANNED) {
1605                 return true;
1606         }
1607
1608         /* we will automatically win if the other node is banned */
1609         if (em->node_flags & NODE_FLAGS_STOPPED) {
1610                 return true;
1611         }
1612
1613         /* try to use the most connected node */
1614         if (cmp == 0) {
1615                 cmp = (int)myem.num_connected - (int)em->num_connected;
1616         }
1617
1618         /* then the longest running node */
1619         if (cmp == 0) {
1620                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1621         }
1622
1623         if (cmp == 0) {
1624                 cmp = (int)myem.pnn - (int)em->pnn;
1625         }
1626
1627         return cmp > 0;
1628 }
1629
1630 /*
1631   send out an election request
1632  */
1633 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1634 {
1635         int ret;
1636         TDB_DATA election_data;
1637         struct election_message emsg;
1638         uint64_t srvid;
1639         struct ctdb_context *ctdb = rec->ctdb;
1640
1641         srvid = CTDB_SRVID_RECOVERY;
1642
1643         ctdb_election_data(rec, &emsg);
1644
1645         election_data.dsize = sizeof(struct election_message);
1646         election_data.dptr  = (unsigned char *)&emsg;
1647
1648
1649         /* send an election message to all active nodes */
1650         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1651         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1652
1653
1654         /* A new node that is already frozen has entered the cluster.
1655            The existing nodes are not frozen and dont need to be frozen
1656            until the election has ended and we start the actual recovery
1657         */
1658         if (update_recmaster == true) {
1659                 /* first we assume we will win the election and set 
1660                    recoverymaster to be ourself on the current node
1661                  */
1662                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1663                 if (ret != 0) {
1664                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1665                         return -1;
1666                 }
1667         }
1668
1669
1670         return 0;
1671 }
1672
1673 /*
1674   this function will unban all nodes in the cluster
1675 */
1676 static void unban_all_nodes(struct ctdb_context *ctdb)
1677 {
1678         int ret, i;
1679         struct ctdb_node_map *nodemap;
1680         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1681         
1682         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1683         if (ret != 0) {
1684                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1685                 return;
1686         }
1687
1688         for (i=0;i<nodemap->num;i++) {
1689                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1690                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1691                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1692                 }
1693         }
1694
1695         talloc_free(tmp_ctx);
1696 }
1697
1698
1699 /*
1700   we think we are winning the election - send a broadcast election request
1701  */
1702 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1703 {
1704         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1705         int ret;
1706
1707         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1708         if (ret != 0) {
1709                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1710         }
1711
1712         talloc_free(rec->send_election_te);
1713         rec->send_election_te = NULL;
1714 }
1715
1716 /*
1717   handler for memory dumps
1718 */
1719 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1720                              TDB_DATA data, void *private_data)
1721 {
1722         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1723         TDB_DATA *dump;
1724         int ret;
1725         struct rd_memdump_reply *rd;
1726
1727         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1728                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1729                 talloc_free(tmp_ctx);
1730                 return;
1731         }
1732         rd = (struct rd_memdump_reply *)data.dptr;
1733
1734         dump = talloc_zero(tmp_ctx, TDB_DATA);
1735         if (dump == NULL) {
1736                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1737                 talloc_free(tmp_ctx);
1738                 return;
1739         }
1740         ret = ctdb_dump_memory(ctdb, dump);
1741         if (ret != 0) {
1742                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1743                 talloc_free(tmp_ctx);
1744                 return;
1745         }
1746
1747 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1748
1749         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1750         if (ret != 0) {
1751                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1752                 talloc_free(tmp_ctx);
1753                 return;
1754         }
1755
1756         talloc_free(tmp_ctx);
1757 }
1758
1759 /*
1760   handler for reload_nodes
1761 */
1762 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1763                              TDB_DATA data, void *private_data)
1764 {
1765         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1766
1767         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1768
1769         reload_nodes_file(rec->ctdb);
1770 }
1771
1772
1773 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1774                               struct timeval yt, void *p)
1775 {
1776         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1777
1778         talloc_free(rec->ip_check_disable_ctx);
1779         rec->ip_check_disable_ctx = NULL;
1780 }
1781
1782 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1783                              TDB_DATA data, void *private_data)
1784 {
1785         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1786         uint32_t timeout;
1787
1788         if (rec->ip_check_disable_ctx != NULL) {
1789                 talloc_free(rec->ip_check_disable_ctx);
1790                 rec->ip_check_disable_ctx = NULL;
1791         }
1792
1793         if (data.dsize != sizeof(uint32_t)) {
1794                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1795                                  "expexting %lu\n", (long unsigned)data.dsize,
1796                                  (long unsigned)sizeof(uint32_t)));
1797                 return;
1798         }
1799         if (data.dptr == NULL) {
1800                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1801                 return;
1802         }
1803
1804         timeout = *((uint32_t *)data.dptr);
1805         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1806
1807         rec->ip_check_disable_ctx = talloc_new(rec);
1808         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1809
1810         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1811 }
1812
1813
1814 /*
1815   handler for ip reallocate, just add it to the list of callers and 
1816   handle this later in the monitor_cluster loop so we do not recurse
1817   with other callers to takeover_run()
1818 */
1819 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1820                              TDB_DATA data, void *private_data)
1821 {
1822         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1823         struct ip_reallocate_list *caller;
1824
1825         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1826                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1827                 return;
1828         }
1829
1830         if (rec->ip_reallocate_ctx == NULL) {
1831                 rec->ip_reallocate_ctx = talloc_new(rec);
1832                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1833         }
1834
1835         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1836         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1837
1838         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1839         caller->next = rec->reallocate_callers;
1840         rec->reallocate_callers = caller;
1841
1842         return;
1843 }
1844
1845 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1846 {
1847         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1848         TDB_DATA result;
1849         int32_t ret;
1850         struct ip_reallocate_list *callers;
1851
1852         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1853         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1854         result.dsize = sizeof(int32_t);
1855         result.dptr  = (uint8_t *)&ret;
1856
1857         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1858
1859                 /* Someone that sent srvid==0 does not want a reply */
1860                 if (callers->rd->srvid == 0) {
1861                         continue;
1862                 }
1863                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1864                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1865                                   (unsigned long long)callers->rd->srvid));
1866                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1867                 if (ret != 0) {
1868                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1869                                          "message to %u:%llu\n",
1870                                          (unsigned)callers->rd->pnn,
1871                                          (unsigned long long)callers->rd->srvid));
1872                 }
1873         }
1874
1875         talloc_free(tmp_ctx);
1876         talloc_free(rec->ip_reallocate_ctx);
1877         rec->ip_reallocate_ctx = NULL;
1878         rec->reallocate_callers = NULL;
1879         
1880 }
1881
1882
1883 /*
1884   handler for recovery master elections
1885 */
1886 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1887                              TDB_DATA data, void *private_data)
1888 {
1889         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1890         int ret;
1891         struct election_message *em = (struct election_message *)data.dptr;
1892         TALLOC_CTX *mem_ctx;
1893
1894         /* we got an election packet - update the timeout for the election */
1895         talloc_free(rec->election_timeout);
1896         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1897                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1898                                                 ctdb_election_timeout, rec);
1899
1900         mem_ctx = talloc_new(ctdb);
1901
1902         /* someone called an election. check their election data
1903            and if we disagree and we would rather be the elected node, 
1904            send a new election message to all other nodes
1905          */
1906         if (ctdb_election_win(rec, em)) {
1907                 if (!rec->send_election_te) {
1908                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1909                                                                 timeval_current_ofs(0, 500000),
1910                                                                 election_send_request, rec);
1911                 }
1912                 talloc_free(mem_ctx);
1913                 /*unban_all_nodes(ctdb);*/
1914                 return;
1915         }
1916         
1917         /* we didn't win */
1918         talloc_free(rec->send_election_te);
1919         rec->send_election_te = NULL;
1920
1921         if (ctdb->tunable.verify_recovery_lock != 0) {
1922                 /* release the recmaster lock */
1923                 if (em->pnn != ctdb->pnn &&
1924                     ctdb->recovery_lock_fd != -1) {
1925                         close(ctdb->recovery_lock_fd);
1926                         ctdb->recovery_lock_fd = -1;
1927                         unban_all_nodes(ctdb);
1928                 }
1929         }
1930
1931         /* ok, let that guy become recmaster then */
1932         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1933         if (ret != 0) {
1934                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1935                 talloc_free(mem_ctx);
1936                 return;
1937         }
1938
1939         talloc_free(mem_ctx);
1940         return;
1941 }
1942
1943
1944 /*
1945   force the start of the election process
1946  */
1947 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1948                            struct ctdb_node_map *nodemap)
1949 {
1950         int ret;
1951         struct ctdb_context *ctdb = rec->ctdb;
1952
1953         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1954
1955         /* set all nodes to recovery mode to stop all internode traffic */
1956         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1957         if (ret != 0) {
1958                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1959                 return;
1960         }
1961
1962         talloc_free(rec->election_timeout);
1963         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1964                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1965                                                 ctdb_election_timeout, rec);
1966
1967         ret = send_election_request(rec, pnn, true);
1968         if (ret!=0) {
1969                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1970                 return;
1971         }
1972
1973         /* wait for a few seconds to collect all responses */
1974         ctdb_wait_election(rec);
1975 }
1976
1977
1978
1979 /*
1980   handler for when a node changes its flags
1981 */
1982 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1983                             TDB_DATA data, void *private_data)
1984 {
1985         int ret;
1986         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1987         struct ctdb_node_map *nodemap=NULL;
1988         TALLOC_CTX *tmp_ctx;
1989         uint32_t changed_flags;
1990         int i;
1991         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1992         int disabled_flag_changed;
1993
1994         if (data.dsize != sizeof(*c)) {
1995                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1996                 return;
1997         }
1998
1999         tmp_ctx = talloc_new(ctdb);
2000         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2001
2002         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2003         if (ret != 0) {
2004                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2005                 talloc_free(tmp_ctx);
2006                 return;         
2007         }
2008
2009
2010         for (i=0;i<nodemap->num;i++) {
2011                 if (nodemap->nodes[i].pnn == c->pnn) break;
2012         }
2013
2014         if (i == nodemap->num) {
2015                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2016                 talloc_free(tmp_ctx);
2017                 return;
2018         }
2019
2020         changed_flags = c->old_flags ^ c->new_flags;
2021
2022         if (nodemap->nodes[i].flags != c->new_flags) {
2023                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2024         }
2025
2026         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2027
2028         nodemap->nodes[i].flags = c->new_flags;
2029
2030         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2031                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2032
2033         if (ret == 0) {
2034                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2035                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2036         }
2037         
2038         if (ret == 0 &&
2039             ctdb->recovery_master == ctdb->pnn &&
2040             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2041                 /* Only do the takeover run if the perm disabled or unhealthy
2042                    flags changed since these will cause an ip failover but not
2043                    a recovery.
2044                    If the node became disconnected or banned this will also
2045                    lead to an ip address failover but that is handled 
2046                    during recovery
2047                 */
2048                 if (disabled_flag_changed) {
2049                         rec->need_takeover_run = true;
2050                 }
2051         }
2052
2053         talloc_free(tmp_ctx);
2054 }
2055
2056 /*
2057   handler for when we need to push out flag changes ot all other nodes
2058 */
2059 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2060                             TDB_DATA data, void *private_data)
2061 {
2062         int ret;
2063         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2064         struct ctdb_node_map *nodemap=NULL;
2065         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2066         uint32_t recmaster;
2067         uint32_t *nodes;
2068
2069         /* find the recovery master */
2070         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2071         if (ret != 0) {
2072                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2073                 talloc_free(tmp_ctx);
2074                 return;
2075         }
2076
2077         /* read the node flags from the recmaster */
2078         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2079         if (ret != 0) {
2080                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2081                 talloc_free(tmp_ctx);
2082                 return;
2083         }
2084         if (c->pnn >= nodemap->num) {
2085                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2086                 talloc_free(tmp_ctx);
2087                 return;
2088         }
2089
2090         /* send the flags update to all connected nodes */
2091         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2092
2093         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2094                                       nodes, 0, CONTROL_TIMEOUT(),
2095                                       false, data,
2096                                       NULL, NULL,
2097                                       NULL) != 0) {
2098                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2099
2100                 talloc_free(tmp_ctx);
2101                 return;
2102         }
2103
2104         talloc_free(tmp_ctx);
2105 }
2106
2107
2108 struct verify_recmode_normal_data {
2109         uint32_t count;
2110         enum monitor_result status;
2111 };
2112
2113 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2114 {
2115         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2116
2117
2118         /* one more node has responded with recmode data*/
2119         rmdata->count--;
2120
2121         /* if we failed to get the recmode, then return an error and let
2122            the main loop try again.
2123         */
2124         if (state->state != CTDB_CONTROL_DONE) {
2125                 if (rmdata->status == MONITOR_OK) {
2126                         rmdata->status = MONITOR_FAILED;
2127                 }
2128                 return;
2129         }
2130
2131         /* if we got a response, then the recmode will be stored in the
2132            status field
2133         */
2134         if (state->status != CTDB_RECOVERY_NORMAL) {
2135                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2136                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2137         }
2138
2139         return;
2140 }
2141
2142
2143 /* verify that all nodes are in normal recovery mode */
2144 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2145 {
2146         struct verify_recmode_normal_data *rmdata;
2147         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2148         struct ctdb_client_control_state *state;
2149         enum monitor_result status;
2150         int j;
2151         
2152         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2153         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2154         rmdata->count  = 0;
2155         rmdata->status = MONITOR_OK;
2156
2157         /* loop over all active nodes and send an async getrecmode call to 
2158            them*/
2159         for (j=0; j<nodemap->num; j++) {
2160                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2161                         continue;
2162                 }
2163                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2164                                         CONTROL_TIMEOUT(), 
2165                                         nodemap->nodes[j].pnn);
2166                 if (state == NULL) {
2167                         /* we failed to send the control, treat this as 
2168                            an error and try again next iteration
2169                         */                      
2170                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2171                         talloc_free(mem_ctx);
2172                         return MONITOR_FAILED;
2173                 }
2174
2175                 /* set up the callback functions */
2176                 state->async.fn = verify_recmode_normal_callback;
2177                 state->async.private_data = rmdata;
2178
2179                 /* one more control to wait for to complete */
2180                 rmdata->count++;
2181         }
2182
2183
2184         /* now wait for up to the maximum number of seconds allowed
2185            or until all nodes we expect a response from has replied
2186         */
2187         while (rmdata->count > 0) {
2188                 event_loop_once(ctdb->ev);
2189         }
2190
2191         status = rmdata->status;
2192         talloc_free(mem_ctx);
2193         return status;
2194 }
2195
2196
2197 struct verify_recmaster_data {
2198         struct ctdb_recoverd *rec;
2199         uint32_t count;
2200         uint32_t pnn;
2201         enum monitor_result status;
2202 };
2203
2204 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2205 {
2206         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2207
2208
2209         /* one more node has responded with recmaster data*/
2210         rmdata->count--;
2211
2212         /* if we failed to get the recmaster, then return an error and let
2213            the main loop try again.
2214         */
2215         if (state->state != CTDB_CONTROL_DONE) {
2216                 if (rmdata->status == MONITOR_OK) {
2217                         rmdata->status = MONITOR_FAILED;
2218                 }
2219                 return;
2220         }
2221
2222         /* if we got a response, then the recmaster will be stored in the
2223            status field
2224         */
2225         if (state->status != rmdata->pnn) {
2226                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2227                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2228                 rmdata->status = MONITOR_ELECTION_NEEDED;
2229         }
2230
2231         return;
2232 }
2233
2234
2235 /* verify that all nodes agree that we are the recmaster */
2236 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2237 {
2238         struct ctdb_context *ctdb = rec->ctdb;
2239         struct verify_recmaster_data *rmdata;
2240         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2241         struct ctdb_client_control_state *state;
2242         enum monitor_result status;
2243         int j;
2244         
2245         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2246         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2247         rmdata->rec    = rec;
2248         rmdata->count  = 0;
2249         rmdata->pnn    = pnn;
2250         rmdata->status = MONITOR_OK;
2251
2252         /* loop over all active nodes and send an async getrecmaster call to 
2253            them*/
2254         for (j=0; j<nodemap->num; j++) {
2255                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2256                         continue;
2257                 }
2258                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2259                                         CONTROL_TIMEOUT(),
2260                                         nodemap->nodes[j].pnn);
2261                 if (state == NULL) {
2262                         /* we failed to send the control, treat this as 
2263                            an error and try again next iteration
2264                         */                      
2265                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2266                         talloc_free(mem_ctx);
2267                         return MONITOR_FAILED;
2268                 }
2269
2270                 /* set up the callback functions */
2271                 state->async.fn = verify_recmaster_callback;
2272                 state->async.private_data = rmdata;
2273
2274                 /* one more control to wait for to complete */
2275                 rmdata->count++;
2276         }
2277
2278
2279         /* now wait for up to the maximum number of seconds allowed
2280            or until all nodes we expect a response from has replied
2281         */
2282         while (rmdata->count > 0) {
2283                 event_loop_once(ctdb->ev);
2284         }
2285
2286         status = rmdata->status;
2287         talloc_free(mem_ctx);
2288         return status;
2289 }
2290
2291
2292 /* called to check that the allocation of public ip addresses is ok.
2293 */
2294 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2295 {
2296         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2297         struct ctdb_all_public_ips *ips = NULL;
2298         struct ctdb_uptime *uptime1 = NULL;
2299         struct ctdb_uptime *uptime2 = NULL;
2300         int ret, j;
2301
2302         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2303                                 CTDB_CURRENT_NODE, &uptime1);
2304         if (ret != 0) {
2305                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2306                 talloc_free(mem_ctx);
2307                 return -1;
2308         }
2309
2310         /* read the ip allocation from the local node */
2311         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2312         if (ret != 0) {
2313                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2314                 talloc_free(mem_ctx);
2315                 return -1;
2316         }
2317
2318         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2319                                 CTDB_CURRENT_NODE, &uptime2);
2320         if (ret != 0) {
2321                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2322                 talloc_free(mem_ctx);
2323                 return -1;
2324         }
2325
2326         /* skip the check if the startrecovery time has changed */
2327         if (timeval_compare(&uptime1->last_recovery_started,
2328                             &uptime2->last_recovery_started) != 0) {
2329                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2330                 talloc_free(mem_ctx);
2331                 return 0;
2332         }
2333
2334         /* skip the check if the endrecovery time has changed */
2335         if (timeval_compare(&uptime1->last_recovery_finished,
2336                             &uptime2->last_recovery_finished) != 0) {
2337                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2338                 talloc_free(mem_ctx);
2339                 return 0;
2340         }
2341
2342         /* skip the check if we have started but not finished recovery */
2343         if (timeval_compare(&uptime1->last_recovery_finished,
2344                             &uptime1->last_recovery_started) != 1) {
2345                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2346                 talloc_free(mem_ctx);
2347
2348                 return 0;
2349         }
2350
2351         /* verify that we have the ip addresses we should have
2352            and we dont have ones we shouldnt have.
2353            if we find an inconsistency we set recmode to
2354            active on the local node and wait for the recmaster
2355            to do a full blown recovery
2356         */
2357         for (j=0; j<ips->num; j++) {
2358                 if (ips->ips[j].pnn == pnn) {
2359                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2360                                 struct takeover_run_reply rd;
2361                                 TDB_DATA data;
2362
2363                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2364                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2365
2366                                 rd.pnn   = ctdb->pnn;
2367                                 rd.srvid = 0;
2368                                 data.dptr = (uint8_t *)&rd;
2369                                 data.dsize = sizeof(rd);
2370
2371                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2372                                 if (ret != 0) {
2373                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2374                                 }
2375                         }
2376                 } else {
2377                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2378                                 struct takeover_run_reply rd;
2379                                 TDB_DATA data;
2380
2381                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2382                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2383
2384                                 rd.pnn   = ctdb->pnn;
2385                                 rd.srvid = 0;
2386                                 data.dptr = (uint8_t *)&rd;
2387                                 data.dsize = sizeof(rd);
2388
2389                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2390                                 if (ret != 0) {
2391                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2392                                 }
2393                         }
2394                 }
2395         }
2396
2397         talloc_free(mem_ctx);
2398         return 0;
2399 }
2400
2401
2402 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2403 {
2404         struct ctdb_node_map **remote_nodemaps = callback_data;
2405
2406         if (node_pnn >= ctdb->num_nodes) {
2407                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2408                 return;
2409         }
2410
2411         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2412
2413 }
2414
2415 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2416         struct ctdb_node_map *nodemap,
2417         struct ctdb_node_map **remote_nodemaps)
2418 {
2419         uint32_t *nodes;
2420
2421         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2422         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2423                                         nodes, 0,
2424                                         CONTROL_TIMEOUT(), false, tdb_null,
2425                                         async_getnodemap_callback,
2426                                         NULL,
2427                                         remote_nodemaps) != 0) {
2428                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2429
2430                 return -1;
2431         }
2432
2433         return 0;
2434 }
2435
2436 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2437 struct ctdb_check_reclock_state {
2438         struct ctdb_context *ctdb;
2439         struct timeval start_time;
2440         int fd[2];
2441         pid_t child;
2442         struct timed_event *te;
2443         struct fd_event *fde;
2444         enum reclock_child_status status;
2445 };
2446
2447 /* when we free the reclock state we must kill any child process.
2448 */
2449 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2450 {
2451         struct ctdb_context *ctdb = state->ctdb;
2452
2453         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2454
2455         if (state->fd[0] != -1) {
2456                 close(state->fd[0]);
2457                 state->fd[0] = -1;
2458         }
2459         if (state->fd[1] != -1) {
2460                 close(state->fd[1]);
2461                 state->fd[1] = -1;
2462         }
2463         kill(state->child, SIGKILL);
2464         return 0;
2465 }
2466
2467 /*
2468   called if our check_reclock child times out. this would happen if
2469   i/o to the reclock file blocks.
2470  */
2471 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2472                                          struct timeval t, void *private_data)
2473 {
2474         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2475                                            struct ctdb_check_reclock_state);
2476
2477         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2478         state->status = RECLOCK_TIMEOUT;
2479 }
2480
2481 /* this is called when the child process has completed checking the reclock
2482    file and has written data back to us through the pipe.
2483 */
2484 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2485                              uint16_t flags, void *private_data)
2486 {
2487         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2488                                              struct ctdb_check_reclock_state);
2489         char c = 0;
2490         int ret;
2491
2492         /* we got a response from our child process so we can abort the
2493            timeout.
2494         */
2495         talloc_free(state->te);
2496         state->te = NULL;
2497
2498         ret = read(state->fd[0], &c, 1);
2499         if (ret != 1 || c != RECLOCK_OK) {
2500                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2501                 state->status = RECLOCK_FAILED;
2502
2503                 return;
2504         }
2505
2506         state->status = RECLOCK_OK;
2507         return;
2508 }
2509
2510 static int check_recovery_lock(struct ctdb_context *ctdb)
2511 {
2512         int ret;
2513         struct ctdb_check_reclock_state *state;
2514         pid_t parent = getpid();
2515
2516         if (ctdb->recovery_lock_fd == -1) {
2517                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2518                 return -1;
2519         }
2520
2521         state = talloc(ctdb, struct ctdb_check_reclock_state);
2522         CTDB_NO_MEMORY(ctdb, state);
2523
2524         state->ctdb = ctdb;
2525         state->start_time = timeval_current();
2526         state->status = RECLOCK_CHECKING;
2527         state->fd[0] = -1;
2528         state->fd[1] = -1;
2529
2530         ret = pipe(state->fd);
2531         if (ret != 0) {
2532                 talloc_free(state);
2533                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2534                 return -1;
2535         }
2536
2537         state->child = fork();
2538         if (state->child == (pid_t)-1) {
2539                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2540                 close(state->fd[0]);
2541                 state->fd[0] = -1;
2542                 close(state->fd[1]);
2543                 state->fd[1] = -1;
2544                 talloc_free(state);
2545                 return -1;
2546         }
2547
2548         if (state->child == 0) {
2549                 char cc = RECLOCK_OK;
2550                 close(state->fd[0]);
2551                 state->fd[0] = -1;
2552
2553                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2554                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2555                         cc = RECLOCK_FAILED;
2556                 }
2557
2558                 write(state->fd[1], &cc, 1);
2559                 /* make sure we die when our parent dies */
2560                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2561                         sleep(5);
2562                         write(state->fd[1], &cc, 1);
2563                 }
2564                 _exit(0);
2565         }
2566         close(state->fd[1]);
2567         state->fd[1] = -1;
2568         set_close_on_exec(state->fd[0]);
2569
2570         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2571
2572         talloc_set_destructor(state, check_reclock_destructor);
2573
2574         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2575                                     ctdb_check_reclock_timeout, state);
2576         if (state->te == NULL) {
2577                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2578                 talloc_free(state);
2579                 return -1;
2580         }
2581
2582         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2583                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2584                                 reclock_child_handler,
2585                                 (void *)state);
2586
2587         if (state->fde == NULL) {
2588                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2589                 talloc_free(state);
2590                 return -1;
2591         }
2592
2593         while (state->status == RECLOCK_CHECKING) {
2594                 event_loop_once(ctdb->ev);
2595         }
2596
2597         if (state->status == RECLOCK_FAILED) {
2598                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2599                 close(ctdb->recovery_lock_fd);
2600                 ctdb->recovery_lock_fd = -1;
2601                 talloc_free(state);
2602                 return -1;
2603         }
2604
2605         talloc_free(state);
2606         return 0;
2607 }
2608
2609 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2610 {
2611         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2612         const char *reclockfile;
2613
2614         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2615                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2616                 talloc_free(tmp_ctx);
2617                 return -1;      
2618         }
2619
2620         if (reclockfile == NULL) {
2621                 if (ctdb->recovery_lock_file != NULL) {
2622                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2623                         talloc_free(ctdb->recovery_lock_file);
2624                         ctdb->recovery_lock_file = NULL;
2625                         if (ctdb->recovery_lock_fd != -1) {
2626                                 close(ctdb->recovery_lock_fd);
2627                                 ctdb->recovery_lock_fd = -1;
2628                         }
2629                 }
2630                 ctdb->tunable.verify_recovery_lock = 0;
2631                 talloc_free(tmp_ctx);
2632                 return 0;
2633         }
2634
2635         if (ctdb->recovery_lock_file == NULL) {
2636                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2637                 if (ctdb->recovery_lock_fd != -1) {
2638                         close(ctdb->recovery_lock_fd);
2639                         ctdb->recovery_lock_fd = -1;
2640                 }
2641                 talloc_free(tmp_ctx);
2642                 return 0;
2643         }
2644
2645
2646         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2647                 talloc_free(tmp_ctx);
2648                 return 0;
2649         }
2650
2651         talloc_free(ctdb->recovery_lock_file);
2652         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2653         ctdb->tunable.verify_recovery_lock = 0;
2654         if (ctdb->recovery_lock_fd != -1) {
2655                 close(ctdb->recovery_lock_fd);
2656                 ctdb->recovery_lock_fd = -1;
2657         }
2658
2659         talloc_free(tmp_ctx);
2660         return 0;
2661 }
2662                 
2663 /*
2664   the main monitoring loop
2665  */
2666 static void monitor_cluster(struct ctdb_context *ctdb)
2667 {
2668         uint32_t pnn;
2669         TALLOC_CTX *mem_ctx=NULL;
2670         struct ctdb_node_map *nodemap=NULL;
2671         struct ctdb_node_map *recmaster_nodemap=NULL;
2672         struct ctdb_node_map **remote_nodemaps=NULL;
2673         struct ctdb_vnn_map *vnnmap=NULL;
2674         struct ctdb_vnn_map *remote_vnnmap=NULL;
2675         int32_t debug_level;
2676         int i, j, ret;
2677         struct ctdb_recoverd *rec;
2678
2679         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2680
2681         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2682         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2683
2684         rec->ctdb = ctdb;
2685
2686         rec->priority_time = timeval_current();
2687
2688         /* register a message port for sending memory dumps */
2689         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2690
2691         /* register a message port for recovery elections */
2692         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2693
2694         /* when nodes are disabled/enabled */
2695         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2696
2697         /* when we are asked to puch out a flag change */
2698         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2699
2700         /* register a message port for vacuum fetch */
2701         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2702
2703         /* register a message port for reloadnodes  */
2704         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2705
2706         /* register a message port for performing a takeover run */
2707         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2708
2709         /* register a message port for disabling the ip check for a short while */
2710         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2711
2712 again:
2713         if (mem_ctx) {
2714                 talloc_free(mem_ctx);
2715                 mem_ctx = NULL;
2716         }
2717         mem_ctx = talloc_new(ctdb);
2718         if (!mem_ctx) {
2719                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2720                 exit(-1);
2721         }
2722
2723         /* we only check for recovery once every second */
2724         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2725
2726         /* verify that the main daemon is still running */
2727         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2728                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2729                 exit(-1);
2730         }
2731
2732         /* ping the local daemon to tell it we are alive */
2733         ctdb_ctrl_recd_ping(ctdb);
2734
2735         if (rec->election_timeout) {
2736                 /* an election is in progress */
2737                 goto again;
2738         }
2739
2740         /* read the debug level from the parent and update locally */
2741         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2742         if (ret !=0) {
2743                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2744                 goto again;
2745         }
2746         LogLevel = debug_level;
2747
2748
2749         /* We must check if we need to ban a node here but we want to do this
2750            as early as possible so we dont wait until we have pulled the node
2751            map from the local node. thats why we have the hardcoded value 20
2752         */
2753         for (i=0; i<ctdb->num_nodes; i++) {
2754                 struct ctdb_banning_state *ban_state;
2755
2756                 if (ctdb->nodes[i]->ban_state == NULL) {
2757                         continue;
2758                 }
2759                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2760                 if (ban_state->count < 20) {
2761                         continue;
2762                 }
2763                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2764                         ctdb->nodes[i]->pnn, ban_state->count,
2765                         ctdb->tunable.recovery_ban_period));
2766                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2767                 ban_state->count = 0;
2768         }
2769
2770         /* get relevant tunables */
2771         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2772         if (ret != 0) {
2773                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2774                 goto again;
2775         }
2776
2777         /* get the current recovery lock file from the server */
2778         if (update_recovery_lock_file(ctdb) != 0) {
2779                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2780                 goto again;
2781         }
2782
2783         /* Make sure that if recovery lock verification becomes disabled when
2784            we close the file
2785         */
2786         if (ctdb->tunable.verify_recovery_lock == 0) {
2787                 if (ctdb->recovery_lock_fd != -1) {
2788                         close(ctdb->recovery_lock_fd);
2789                         ctdb->recovery_lock_fd = -1;
2790                 }
2791         }
2792
2793         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2794         if (pnn == (uint32_t)-1) {
2795                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2796                 goto again;
2797         }
2798
2799         /* get the vnnmap */
2800         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2801         if (ret != 0) {
2802                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2803                 goto again;
2804         }
2805
2806
2807         /* get number of nodes */
2808         if (rec->nodemap) {
2809                 talloc_free(rec->nodemap);
2810                 rec->nodemap = NULL;
2811                 nodemap=NULL;
2812         }
2813         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2814         if (ret != 0) {
2815                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2816                 goto again;
2817         }
2818         nodemap = rec->nodemap;
2819
2820         /* check which node is the recovery master */
2821         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2822         if (ret != 0) {
2823                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2824                 goto again;
2825         }
2826
2827         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2828         if (rec->recmaster != pnn) {
2829                 if (rec->ip_reallocate_ctx != NULL) {
2830                         talloc_free(rec->ip_reallocate_ctx);
2831                         rec->ip_reallocate_ctx = NULL;
2832                         rec->reallocate_callers = NULL;
2833                 }
2834         }
2835         /* if there are takeovers requested, perform it and notify the waiters */
2836         if (rec->reallocate_callers) {
2837                 process_ipreallocate_requests(ctdb, rec);
2838         }
2839
2840         if (rec->recmaster == (uint32_t)-1) {
2841                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2842                 force_election(rec, pnn, nodemap);
2843                 goto again;
2844         }
2845
2846
2847         /* if the local daemon is STOPPED, we verify that the databases are
2848            also frozen and thet the recmode is set to active 
2849         */
2850         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2851                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2852                 if (ret != 0) {
2853                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2854                 }
2855                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2856                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2857
2858                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2859                         if (ret != 0) {
2860                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2861                                 goto again;
2862                         }
2863                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2864                         if (ret != 0) {
2865                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2866
2867                                 goto again;
2868                         }
2869                         goto again;
2870                 }
2871         }
2872         /* If the local node is stopped, verify we are not the recmaster 
2873            and yield this role if so
2874         */
2875         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2876                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2877                 force_election(rec, pnn, nodemap);
2878                 goto again;
2879         }
2880         
2881         /* check that we (recovery daemon) and the local ctdb daemon
2882            agrees on whether we are banned or not
2883         */
2884 //qqq
2885
2886         /* remember our own node flags */
2887         rec->node_flags = nodemap->nodes[pnn].flags;
2888
2889         /* count how many active nodes there are */
2890         rec->num_active    = 0;
2891         rec->num_connected = 0;
2892         for (i=0; i<nodemap->num; i++) {
2893                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2894                         rec->num_active++;
2895                 }
2896                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2897                         rec->num_connected++;
2898                 }
2899         }
2900
2901
2902         /* verify that the recmaster node is still active */
2903         for (j=0; j<nodemap->num; j++) {
2904                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2905                         break;
2906                 }
2907         }
2908
2909         if (j == nodemap->num) {
2910                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2911                 force_election(rec, pnn, nodemap);
2912                 goto again;
2913         }
2914
2915         /* if recovery master is disconnected we must elect a new recmaster */
2916         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2917                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2918                 force_election(rec, pnn, nodemap);
2919                 goto again;
2920         }
2921
2922         /* grap the nodemap from the recovery master to check if it is banned */
2923         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2924                                    mem_ctx, &recmaster_nodemap);
2925         if (ret != 0) {
2926                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2927                           nodemap->nodes[j].pnn));
2928                 goto again;
2929         }
2930
2931
2932         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2933                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2934                 force_election(rec, pnn, nodemap);
2935                 goto again;
2936         }
2937
2938
2939         /* verify that we have all ip addresses we should have and we dont
2940          * have addresses we shouldnt have.
2941          */ 
2942         if (ctdb->do_checkpublicip) {
2943                 if (rec->ip_check_disable_ctx == NULL) {
2944                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
2945                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2946                         }
2947                 }
2948         }
2949
2950
2951         /* if we are not the recmaster then we do not need to check
2952            if recovery is needed
2953          */
2954         if (pnn != rec->recmaster) {
2955                 goto again;
2956         }
2957
2958
2959         /* ensure our local copies of flags are right */
2960         ret = update_local_flags(rec, nodemap);
2961         if (ret == MONITOR_ELECTION_NEEDED) {
2962                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2963                 force_election(rec, pnn, nodemap);
2964                 goto again;
2965         }
2966         if (ret != MONITOR_OK) {
2967                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2968                 goto again;
2969         }
2970
2971         /* update the list of public ips that a node can handle for
2972            all connected nodes
2973         */
2974         if (ctdb->num_nodes != nodemap->num) {
2975                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2976                 reload_nodes_file(ctdb);
2977                 goto again;
2978         }
2979         for (j=0; j<nodemap->num; j++) {
2980                 /* release any existing data */
2981                 if (ctdb->nodes[j]->public_ips) {
2982                         talloc_free(ctdb->nodes[j]->public_ips);
2983                         ctdb->nodes[j]->public_ips = NULL;
2984                 }
2985
2986                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2987                         continue;
2988                 }
2989
2990                 /* grab a new shiny list of public ips from the node */
2991                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2992                         ctdb->nodes[j]->pnn, 
2993                         ctdb->nodes,
2994                         &ctdb->nodes[j]->public_ips)) {
2995                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2996                                 ctdb->nodes[j]->pnn));
2997                         goto again;
2998                 }
2999         }
3000
3001
3002         /* verify that all active nodes agree that we are the recmaster */
3003         switch (verify_recmaster(rec, nodemap, pnn)) {
3004         case MONITOR_RECOVERY_NEEDED:
3005                 /* can not happen */
3006                 goto again;
3007         case MONITOR_ELECTION_NEEDED:
3008                 force_election(rec, pnn, nodemap);
3009                 goto again;
3010         case MONITOR_OK:
3011                 break;
3012         case MONITOR_FAILED:
3013                 goto again;
3014         }
3015
3016
3017         if (rec->need_recovery) {
3018                 /* a previous recovery didn't finish */
3019                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3020                 goto again;             
3021         }
3022
3023         /* verify that all active nodes are in normal mode 
3024            and not in recovery mode 
3025         */
3026         switch (verify_recmode(ctdb, nodemap)) {
3027         case MONITOR_RECOVERY_NEEDED:
3028                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3029                 goto again;
3030         case MONITOR_FAILED:
3031                 goto again;
3032         case MONITOR_ELECTION_NEEDED:
3033                 /* can not happen */
3034         case MONITOR_OK:
3035                 break;
3036         }
3037
3038
3039         if (ctdb->tunable.verify_recovery_lock != 0) {
3040                 /* we should have the reclock - check its not stale */
3041                 ret = check_recovery_lock(ctdb);
3042                 if (ret != 0) {
3043                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3044                         ctdb_set_culprit(rec, ctdb->pnn);
3045                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3046                         goto again;
3047                 }
3048         }
3049
3050         /* get the nodemap for all active remote nodes
3051          */
3052         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3053         if (remote_nodemaps == NULL) {
3054                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3055                 goto again;
3056         }
3057         for(i=0; i<nodemap->num; i++) {
3058                 remote_nodemaps[i] = NULL;
3059         }
3060         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3061                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3062                 goto again;
3063         } 
3064
3065         /* verify that all other nodes have the same nodemap as we have
3066         */
3067         for (j=0; j<nodemap->num; j++) {
3068                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3069                         continue;
3070                 }
3071
3072                 if (remote_nodemaps[j] == NULL) {
3073                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3074                         ctdb_set_culprit(rec, j);
3075
3076                         goto again;
3077                 }
3078
3079                 /* if the nodes disagree on how many nodes there are
3080                    then this is a good reason to try recovery
3081                  */
3082                 if (remote_nodemaps[j]->num != nodemap->num) {
3083                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3084                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3085                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3086                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3087                         goto again;
3088                 }
3089
3090                 /* if the nodes disagree on which nodes exist and are
3091                    active, then that is also a good reason to do recovery
3092                  */
3093                 for (i=0;i<nodemap->num;i++) {
3094                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3095                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3096                                           nodemap->nodes[j].pnn, i, 
3097                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3098                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3099                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3100                                             vnnmap);
3101                                 goto again;
3102                         }
3103                 }
3104
3105                 /* verify the flags are consistent
3106                 */
3107                 for (i=0; i<nodemap->num; i++) {
3108                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3109                                 continue;
3110                         }
3111                         
3112                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3113                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3114                                   nodemap->nodes[j].pnn, 
3115                                   nodemap->nodes[i].pnn, 
3116                                   remote_nodemaps[j]->nodes[i].flags,
3117                                   nodemap->nodes[j].flags));
3118                                 if (i == j) {
3119                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3120                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3121                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3122                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3123                                                     vnnmap);
3124                                         goto again;
3125                                 } else {
3126                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3127                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3128                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3129                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3130                                                     vnnmap);
3131                                         goto again;
3132                                 }
3133                         }
3134                 }
3135         }
3136
3137
3138         /* there better be the same number of lmasters in the vnn map
3139            as there are active nodes or we will have to do a recovery
3140          */
3141         if (vnnmap->size != rec->num_active) {
3142                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3143                           vnnmap->size, rec->num_active));
3144                 ctdb_set_culprit(rec, ctdb->pnn);
3145                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3146                 goto again;
3147         }
3148
3149         /* verify that all active nodes in the nodemap also exist in 
3150            the vnnmap.
3151          */
3152         for (j=0; j<nodemap->num; j++) {
3153                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3154                         continue;
3155                 }
3156                 if (nodemap->nodes[j].pnn == pnn) {
3157                         continue;
3158                 }
3159
3160                 for (i=0; i<vnnmap->size; i++) {
3161                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3162                                 break;
3163                         }
3164                 }
3165                 if (i == vnnmap->size) {
3166                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3167                                   nodemap->nodes[j].pnn));
3168                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3169                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3170                         goto again;
3171                 }
3172         }
3173
3174         
3175         /* verify that all other nodes have the same vnnmap
3176            and are from the same generation
3177          */
3178         for (j=0; j<nodemap->num; j++) {
3179                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3180                         continue;
3181                 }
3182                 if (nodemap->nodes[j].pnn == pnn) {
3183                         continue;
3184                 }
3185
3186                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3187                                           mem_ctx, &remote_vnnmap);
3188                 if (ret != 0) {
3189                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3190                                   nodemap->nodes[j].pnn));
3191                         goto again;
3192                 }
3193
3194                 /* verify the vnnmap generation is the same */
3195                 if (vnnmap->generation != remote_vnnmap->generation) {
3196                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3197                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3198                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3199                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3200                         goto again;
3201                 }
3202
3203                 /* verify the vnnmap size is the same */
3204                 if (vnnmap->size != remote_vnnmap->size) {
3205                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3206                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3207                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3208                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3209                         goto again;
3210                 }
3211
3212                 /* verify the vnnmap is the same */
3213                 for (i=0;i<vnnmap->size;i++) {
3214                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3215                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3216                                           nodemap->nodes[j].pnn));
3217                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3218                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3219                                             vnnmap);
3220                                 goto again;
3221                         }
3222                 }
3223         }
3224
3225         /* we might need to change who has what IP assigned */
3226         if (rec->need_takeover_run) {
3227                 rec->need_takeover_run = false;
3228
3229                 /* execute the "startrecovery" event script on all nodes */
3230                 ret = run_startrecovery_eventscript(rec, nodemap);
3231                 if (ret!=0) {
3232                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3233                         ctdb_set_culprit(rec, ctdb->pnn);
3234                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3235                 }
3236
3237                 ret = ctdb_takeover_run(ctdb, nodemap);
3238                 if (ret != 0) {
3239                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3240                         ctdb_set_culprit(rec, ctdb->pnn);
3241                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3242                 }
3243
3244                 /* execute the "recovered" event script on all nodes */
3245                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3246 #if 0
3247 // we cant check whether the event completed successfully
3248 // since this script WILL fail if the node is in recovery mode
3249 // and if that race happens, the code here would just cause a second
3250 // cascading recovery.
3251                 if (ret!=0) {
3252                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3253                         ctdb_set_culprit(rec, ctdb->pnn);
3254                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3255                 }
3256 #endif
3257         }
3258
3259
3260         goto again;
3261
3262 }
3263
3264 /*
3265   event handler for when the main ctdbd dies
3266  */
3267 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3268                                  uint16_t flags, void *private_data)
3269 {
3270         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3271         _exit(1);
3272 }
3273
3274 /*
3275   called regularly to verify that the recovery daemon is still running
3276  */
3277 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3278                               struct timeval yt, void *p)
3279 {
3280         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3281
3282         if (kill(ctdb->recoverd_pid, 0) != 0) {
3283                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3284
3285                 ctdb_stop_recoverd(ctdb);
3286                 ctdb_stop_keepalive(ctdb);
3287                 ctdb_stop_monitoring(ctdb);
3288                 ctdb_release_all_ips(ctdb);
3289                 if (ctdb->methods != NULL) {
3290                         ctdb->methods->shutdown(ctdb);
3291                 }
3292                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3293
3294                 exit(10);       
3295         }
3296
3297         event_add_timed(ctdb->ev, ctdb, 
3298                         timeval_current_ofs(30, 0),
3299                         ctdb_check_recd, ctdb);
3300 }
3301
3302 static void recd_sig_child_handler(struct event_context *ev,
3303         struct signal_event *se, int signum, int count,
3304         void *dont_care, 
3305         void *private_data)
3306 {
3307 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3308         int status;
3309         pid_t pid = -1;
3310
3311         while (pid != 0) {
3312                 pid = waitpid(-1, &status, WNOHANG);
3313                 if (pid == -1) {
3314                         if (errno != ECHILD) {
3315                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3316                         }
3317                         return;
3318                 }
3319                 if (pid > 0) {
3320                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3321                 }
3322         }
3323 }
3324
3325 /*
3326   startup the recovery daemon as a child of the main ctdb daemon
3327  */
3328 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3329 {
3330         int fd[2];
3331         struct signal_event *se;
3332
3333         if (pipe(fd) != 0) {
3334                 return -1;
3335         }
3336
3337         ctdb->ctdbd_pid = getpid();
3338
3339         ctdb->recoverd_pid = fork();
3340         if (ctdb->recoverd_pid == -1) {
3341                 return -1;
3342         }
3343         
3344         if (ctdb->recoverd_pid != 0) {
3345                 close(fd[0]);
3346                 event_add_timed(ctdb->ev, ctdb, 
3347                                 timeval_current_ofs(30, 0),
3348                                 ctdb_check_recd, ctdb);
3349                 return 0;
3350         }
3351
3352         close(fd[1]);
3353
3354         srandom(getpid() ^ time(NULL));
3355
3356         if (switch_from_server_to_client(ctdb) != 0) {
3357                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3358                 exit(1);
3359         }
3360
3361         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3362
3363         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3364                      ctdb_recoverd_parent, &fd[0]);     
3365
3366         /* set up a handler to pick up sigchld */
3367         se = event_add_signal(ctdb->ev, ctdb,
3368                                      SIGCHLD, 0,
3369                                      recd_sig_child_handler,
3370                                      ctdb);
3371         if (se == NULL) {
3372                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3373                 exit(1);
3374         }
3375
3376         monitor_cluster(ctdb);
3377
3378         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3379         return -1;
3380 }
3381
3382 /*
3383   shutdown the recovery daemon
3384  */
3385 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3386 {
3387         if (ctdb->recoverd_pid == 0) {
3388                 return;
3389         }
3390
3391         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3392         kill(ctdb->recoverd_pid, SIGTERM);
3393 }