ctdb: pass TDB_DISALLOW_NESTING to all tdb_open/tdb_wrap_open calls
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
258 {
259         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
260
261         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
262         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
263 }
264
265 /*
266   change recovery mode on all nodes
267  */
268 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
269 {
270         TDB_DATA data;
271         uint32_t *nodes;
272         TALLOC_CTX *tmp_ctx;
273
274         tmp_ctx = talloc_new(ctdb);
275         CTDB_NO_MEMORY(ctdb, tmp_ctx);
276
277         /* freeze all nodes */
278         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
279         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
280                 int i;
281
282                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
283                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
284                                                 nodes, i,
285                                                 CONTROL_TIMEOUT(),
286                                                 false, tdb_null,
287                                                 NULL,
288                                                 set_recmode_fail_callback,
289                                                 rec) != 0) {
290                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
291                                 talloc_free(tmp_ctx);
292                                 return -1;
293                         }
294                 }
295         }
296
297
298         data.dsize = sizeof(uint32_t);
299         data.dptr = (unsigned char *)&rec_mode;
300
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(),
304                                         false, data,
305                                         NULL, NULL,
306                                         NULL) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 /*
317   change recovery master on all node
318  */
319 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
320 {
321         TDB_DATA data;
322         TALLOC_CTX *tmp_ctx;
323         uint32_t *nodes;
324
325         tmp_ctx = talloc_new(ctdb);
326         CTDB_NO_MEMORY(ctdb, tmp_ctx);
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&pnn;
330
331         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(), false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /* update all remote nodes to use the same db priority that we have
347    this can fail if the remove node has not yet been upgraded to 
348    support this function, so we always return success and never fail
349    a recovery if this call fails.
350 */
351 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
352         struct ctdb_node_map *nodemap, 
353         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
354 {
355         int db;
356         uint32_t *nodes;
357
358         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
359
360         /* step through all local databases */
361         for (db=0; db<dbmap->num;db++) {
362                 TDB_DATA data;
363                 struct ctdb_db_priority db_prio;
364                 int ret;
365
366                 db_prio.db_id     = dbmap->dbs[db].dbid;
367                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
370                         continue;
371                 }
372
373                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
374
375                 data.dptr  = (uint8_t *)&db_prio;
376                 data.dsize = sizeof(db_prio);
377
378                 if (ctdb_client_async_control(ctdb,
379                                         CTDB_CONTROL_SET_DB_PRIORITY,
380                                         nodes, 0,
381                                         CONTROL_TIMEOUT(), false, data,
382                                         NULL, NULL,
383                                         NULL) != 0) {
384                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
385                 }
386         }
387
388         return 0;
389 }                       
390
391 /*
392   ensure all other nodes have attached to any databases that we have
393  */
394 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
395                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
396 {
397         int i, j, db, ret;
398         struct ctdb_dbid_map *remote_dbmap;
399
400         /* verify that all other nodes have all our databases */
401         for (j=0; j<nodemap->num; j++) {
402                 /* we dont need to ourself ourselves */
403                 if (nodemap->nodes[j].pnn == pnn) {
404                         continue;
405                 }
406                 /* dont check nodes that are unavailable */
407                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
408                         continue;
409                 }
410
411                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
412                                          mem_ctx, &remote_dbmap);
413                 if (ret != 0) {
414                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
415                         return -1;
416                 }
417
418                 /* step through all local databases */
419                 for (db=0; db<dbmap->num;db++) {
420                         const char *name;
421
422
423                         for (i=0;i<remote_dbmap->num;i++) {
424                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
425                                         break;
426                                 }
427                         }
428                         /* the remote node already have this database */
429                         if (i!=remote_dbmap->num) {
430                                 continue;
431                         }
432                         /* ok so we need to create this database */
433                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
434                                             mem_ctx, &name);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
437                                 return -1;
438                         }
439                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                            mem_ctx, name, dbmap->dbs[db].persistent);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   ensure we are attached to any databases that anyone else is attached to
454  */
455 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
456                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
457 {
458         int i, j, db, ret;
459         struct ctdb_dbid_map *remote_dbmap;
460
461         /* verify that we have all database any other node has */
462         for (j=0; j<nodemap->num; j++) {
463                 /* we dont need to ourself ourselves */
464                 if (nodemap->nodes[j].pnn == pnn) {
465                         continue;
466                 }
467                 /* dont check nodes that are unavailable */
468                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
469                         continue;
470                 }
471
472                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
473                                          mem_ctx, &remote_dbmap);
474                 if (ret != 0) {
475                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
476                         return -1;
477                 }
478
479                 /* step through all databases on the remote node */
480                 for (db=0; db<remote_dbmap->num;db++) {
481                         const char *name;
482
483                         for (i=0;i<(*dbmap)->num;i++) {
484                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
485                                         break;
486                                 }
487                         }
488                         /* we already have this db locally */
489                         if (i!=(*dbmap)->num) {
490                                 continue;
491                         }
492                         /* ok so we need to create this database and
493                            rebuild dbmap
494                          */
495                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
496                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
497                         if (ret != 0) {
498                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
499                                           nodemap->nodes[j].pnn));
500                                 return -1;
501                         }
502                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
503                                            remote_dbmap->dbs[db].persistent);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
506                                 return -1;
507                         }
508                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
511                                 return -1;
512                         }
513                 }
514         }
515
516         return 0;
517 }
518
519
520 /*
521   pull the remote database contents from one node into the recdb
522  */
523 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
524                                     struct tdb_wrap *recdb, uint32_t dbid,
525                                     bool persistent)
526 {
527         int ret;
528         TDB_DATA outdata;
529         struct ctdb_marshall_buffer *reply;
530         struct ctdb_rec_data *rec;
531         int i;
532         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
533
534         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
535                                CONTROL_TIMEOUT(), &outdata);
536         if (ret != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
538                 talloc_free(tmp_ctx);
539                 return -1;
540         }
541
542         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
543
544         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
545                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
546                 talloc_free(tmp_ctx);
547                 return -1;
548         }
549         
550         rec = (struct ctdb_rec_data *)&reply->data[0];
551         
552         for (i=0;
553              i<reply->count;
554              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
555                 TDB_DATA key, data;
556                 struct ctdb_ltdb_header *hdr;
557                 TDB_DATA existing;
558                 
559                 key.dptr = &rec->data[0];
560                 key.dsize = rec->keylen;
561                 data.dptr = &rec->data[key.dsize];
562                 data.dsize = rec->datalen;
563                 
564                 hdr = (struct ctdb_ltdb_header *)data.dptr;
565
566                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
567                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
568                         talloc_free(tmp_ctx);
569                         return -1;
570                 }
571
572                 /* fetch the existing record, if any */
573                 existing = tdb_fetch(recdb->tdb, key);
574                 
575                 if (existing.dptr != NULL) {
576                         struct ctdb_ltdb_header header;
577                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
578                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
579                                          (unsigned)existing.dsize, srcnode));
580                                 free(existing.dptr);
581                                 talloc_free(tmp_ctx);
582                                 return -1;
583                         }
584                         header = *(struct ctdb_ltdb_header *)existing.dptr;
585                         free(existing.dptr);
586                         if (!(header.rsn < hdr->rsn ||
587                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
588                                 continue;
589                         }
590                 }
591                 
592                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
593                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
594                         talloc_free(tmp_ctx);
595                         return -1;                              
596                 }
597         }
598
599         talloc_free(tmp_ctx);
600
601         return 0;
602 }
603
604 /*
605   pull all the remote database contents into the recdb
606  */
607 static int pull_remote_database(struct ctdb_context *ctdb,
608                                 struct ctdb_recoverd *rec, 
609                                 struct ctdb_node_map *nodemap, 
610                                 struct tdb_wrap *recdb, uint32_t dbid,
611                                 bool persistent)
612 {
613         int j;
614
615         /* pull all records from all other nodes across onto this node
616            (this merges based on rsn)
617         */
618         for (j=0; j<nodemap->num; j++) {
619                 /* dont merge from nodes that are unavailable */
620                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
621                         continue;
622                 }
623                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
624                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
625                                  nodemap->nodes[j].pnn));
626                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
627                         return -1;
628                 }
629         }
630         
631         return 0;
632 }
633
634
635 /*
636   update flags on all active nodes
637  */
638 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
639 {
640         int ret;
641
642         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
643                 if (ret != 0) {
644                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
645                 return -1;
646         }
647
648         return 0;
649 }
650
651 /*
652   ensure all nodes have the same vnnmap we do
653  */
654 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
655                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
656 {
657         int j, ret;
658
659         /* push the new vnn map out to all the nodes */
660         for (j=0; j<nodemap->num; j++) {
661                 /* dont push to nodes that are unavailable */
662                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
663                         continue;
664                 }
665
666                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
667                 if (ret != 0) {
668                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
669                         return -1;
670                 }
671         }
672
673         return 0;
674 }
675
676
677 struct vacuum_info {
678         struct vacuum_info *next, *prev;
679         struct ctdb_recoverd *rec;
680         uint32_t srcnode;
681         struct ctdb_db_context *ctdb_db;
682         struct ctdb_marshall_buffer *recs;
683         struct ctdb_rec_data *r;
684 };
685
686 static void vacuum_fetch_next(struct vacuum_info *v);
687
688 /*
689   called when a vacuum fetch has completed - just free it and do the next one
690  */
691 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
692 {
693         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
694         talloc_free(state);
695         vacuum_fetch_next(v);
696 }
697
698
699 /*
700   process the next element from the vacuum list
701 */
702 static void vacuum_fetch_next(struct vacuum_info *v)
703 {
704         struct ctdb_call call;
705         struct ctdb_rec_data *r;
706
707         while (v->recs->count) {
708                 struct ctdb_client_call_state *state;
709                 TDB_DATA data;
710                 struct ctdb_ltdb_header *hdr;
711
712                 ZERO_STRUCT(call);
713                 call.call_id = CTDB_NULL_FUNC;
714                 call.flags = CTDB_IMMEDIATE_MIGRATION;
715
716                 r = v->r;
717                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
718                 v->recs->count--;
719
720                 call.key.dptr = &r->data[0];
721                 call.key.dsize = r->keylen;
722
723                 /* ensure we don't block this daemon - just skip a record if we can't get
724                    the chainlock */
725                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
726                         continue;
727                 }
728
729                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
730                 if (data.dptr == NULL) {
731                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
732                         continue;
733                 }
734
735                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
736                         free(data.dptr);
737                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
738                         continue;
739                 }
740                 
741                 hdr = (struct ctdb_ltdb_header *)data.dptr;
742                 if (hdr->dmaster == v->rec->ctdb->pnn) {
743                         /* its already local */
744                         free(data.dptr);
745                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
746                         continue;
747                 }
748
749                 free(data.dptr);
750
751                 state = ctdb_call_send(v->ctdb_db, &call);
752                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
753                 if (state == NULL) {
754                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
755                         talloc_free(v);
756                         return;
757                 }
758                 state->async.fn = vacuum_fetch_callback;
759                 state->async.private_data = v;
760                 return;
761         }
762
763         talloc_free(v);
764 }
765
766
767 /*
768   destroy a vacuum info structure
769  */
770 static int vacuum_info_destructor(struct vacuum_info *v)
771 {
772         DLIST_REMOVE(v->rec->vacuum_info, v);
773         return 0;
774 }
775
776
777 /*
778   handler for vacuum fetch
779 */
780 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
781                                  TDB_DATA data, void *private_data)
782 {
783         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
784         struct ctdb_marshall_buffer *recs;
785         int ret, i;
786         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
787         const char *name;
788         struct ctdb_dbid_map *dbmap=NULL;
789         bool persistent = false;
790         struct ctdb_db_context *ctdb_db;
791         struct ctdb_rec_data *r;
792         uint32_t srcnode;
793         struct vacuum_info *v;
794
795         recs = (struct ctdb_marshall_buffer *)data.dptr;
796         r = (struct ctdb_rec_data *)&recs->data[0];
797
798         if (recs->count == 0) {
799                 talloc_free(tmp_ctx);
800                 return;
801         }
802
803         srcnode = r->reqid;
804
805         for (v=rec->vacuum_info;v;v=v->next) {
806                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
807                         /* we're already working on records from this node */
808                         talloc_free(tmp_ctx);
809                         return;
810                 }
811         }
812
813         /* work out if the database is persistent */
814         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
815         if (ret != 0) {
816                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
817                 talloc_free(tmp_ctx);
818                 return;
819         }
820
821         for (i=0;i<dbmap->num;i++) {
822                 if (dbmap->dbs[i].dbid == recs->db_id) {
823                         persistent = dbmap->dbs[i].persistent;
824                         break;
825                 }
826         }
827         if (i == dbmap->num) {
828                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
829                 talloc_free(tmp_ctx);
830                 return;         
831         }
832
833         /* find the name of this database */
834         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
835                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
836                 talloc_free(tmp_ctx);
837                 return;
838         }
839
840         /* attach to it */
841         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
842         if (ctdb_db == NULL) {
843                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
844                 talloc_free(tmp_ctx);
845                 return;
846         }
847
848         v = talloc_zero(rec, struct vacuum_info);
849         if (v == NULL) {
850                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
851                 talloc_free(tmp_ctx);
852                 return;
853         }
854
855         v->rec = rec;
856         v->srcnode = srcnode;
857         v->ctdb_db = ctdb_db;
858         v->recs = talloc_memdup(v, recs, data.dsize);
859         if (v->recs == NULL) {
860                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
861                 talloc_free(v);
862                 talloc_free(tmp_ctx);
863                 return;         
864         }
865         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
866
867         DLIST_ADD(rec->vacuum_info, v);
868
869         talloc_set_destructor(v, vacuum_info_destructor);
870
871         vacuum_fetch_next(v);
872         talloc_free(tmp_ctx);
873 }
874
875
876 /*
877   called when ctdb_wait_timeout should finish
878  */
879 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
880                               struct timeval yt, void *p)
881 {
882         uint32_t *timed_out = (uint32_t *)p;
883         (*timed_out) = 1;
884 }
885
886 /*
887   wait for a given number of seconds
888  */
889 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
890 {
891         uint32_t timed_out = 0;
892         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
893         while (!timed_out) {
894                 event_loop_once(ctdb->ev);
895         }
896 }
897
898 /*
899   called when an election times out (ends)
900  */
901 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
902                                   struct timeval t, void *p)
903 {
904         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
905         rec->election_timeout = NULL;
906
907         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
908 }
909
910
911 /*
912   wait for an election to finish. It finished election_timeout seconds after
913   the last election packet is received
914  */
915 static void ctdb_wait_election(struct ctdb_recoverd *rec)
916 {
917         struct ctdb_context *ctdb = rec->ctdb;
918         while (rec->election_timeout) {
919                 event_loop_once(ctdb->ev);
920         }
921 }
922
923 /*
924   Update our local flags from all remote connected nodes. 
925   This is only run when we are or we belive we are the recovery master
926  */
927 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
928 {
929         int j;
930         struct ctdb_context *ctdb = rec->ctdb;
931         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
932
933         /* get the nodemap for all active remote nodes and verify
934            they are the same as for this node
935          */
936         for (j=0; j<nodemap->num; j++) {
937                 struct ctdb_node_map *remote_nodemap=NULL;
938                 int ret;
939
940                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
941                         continue;
942                 }
943                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
944                         continue;
945                 }
946
947                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
948                                            mem_ctx, &remote_nodemap);
949                 if (ret != 0) {
950                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
951                                   nodemap->nodes[j].pnn));
952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
953                         talloc_free(mem_ctx);
954                         return MONITOR_FAILED;
955                 }
956                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
957                         /* We should tell our daemon about this so it
958                            updates its flags or else we will log the same 
959                            message again in the next iteration of recovery.
960                            Since we are the recovery master we can just as
961                            well update the flags on all nodes.
962                         */
963                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
964                         if (ret != 0) {
965                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
966                                 return -1;
967                         }
968
969                         /* Update our local copy of the flags in the recovery
970                            daemon.
971                         */
972                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
973                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
974                                  nodemap->nodes[j].flags));
975                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
976                 }
977                 talloc_free(remote_nodemap);
978         }
979         talloc_free(mem_ctx);
980         return MONITOR_OK;
981 }
982
983
984 /* Create a new random generation ip. 
985    The generation id can not be the INVALID_GENERATION id
986 */
987 static uint32_t new_generation(void)
988 {
989         uint32_t generation;
990
991         while (1) {
992                 generation = random();
993
994                 if (generation != INVALID_GENERATION) {
995                         break;
996                 }
997         }
998
999         return generation;
1000 }
1001
1002
1003 /*
1004   create a temporary working database
1005  */
1006 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1007 {
1008         char *name;
1009         struct tdb_wrap *recdb;
1010         unsigned tdb_flags;
1011
1012         /* open up the temporary recovery database */
1013         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1014         if (name == NULL) {
1015                 return NULL;
1016         }
1017         unlink(name);
1018
1019         tdb_flags = TDB_NOLOCK;
1020         if (!ctdb->do_setsched) {
1021                 tdb_flags |= TDB_NOMMAP;
1022         }
1023         tdb_flags |= TDB_DISALLOW_NESTING;
1024
1025         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1026                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1027         if (recdb == NULL) {
1028                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1029         }
1030
1031         talloc_free(name);
1032
1033         return recdb;
1034 }
1035
1036
1037 /* 
1038    a traverse function for pulling all relevent records from recdb
1039  */
1040 struct recdb_data {
1041         struct ctdb_context *ctdb;
1042         struct ctdb_marshall_buffer *recdata;
1043         uint32_t len;
1044         bool failed;
1045         bool persistent;
1046 };
1047
1048 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1049 {
1050         struct recdb_data *params = (struct recdb_data *)p;
1051         struct ctdb_rec_data *rec;
1052         struct ctdb_ltdb_header *hdr;
1053
1054         /* skip empty records */
1055         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1056                 return 0;
1057         }
1058
1059         /* update the dmaster field to point to us */
1060         hdr = (struct ctdb_ltdb_header *)data.dptr;
1061         if (!params->persistent) {
1062                 hdr->dmaster = params->ctdb->pnn;
1063         }
1064
1065         /* add the record to the blob ready to send to the nodes */
1066         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1067         if (rec == NULL) {
1068                 params->failed = true;
1069                 return -1;
1070         }
1071         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1072         if (params->recdata == NULL) {
1073                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1074                          rec->length + params->len, params->recdata->count));
1075                 params->failed = true;
1076                 return -1;
1077         }
1078         params->recdata->count++;
1079         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1080         params->len += rec->length;
1081         talloc_free(rec);
1082
1083         return 0;
1084 }
1085
1086 /*
1087   push the recdb database out to all nodes
1088  */
1089 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1090                                bool persistent,
1091                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1092 {
1093         struct recdb_data params;
1094         struct ctdb_marshall_buffer *recdata;
1095         TDB_DATA outdata;
1096         TALLOC_CTX *tmp_ctx;
1097         uint32_t *nodes;
1098
1099         tmp_ctx = talloc_new(ctdb);
1100         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1101
1102         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1103         CTDB_NO_MEMORY(ctdb, recdata);
1104
1105         recdata->db_id = dbid;
1106
1107         params.ctdb = ctdb;
1108         params.recdata = recdata;
1109         params.len = offsetof(struct ctdb_marshall_buffer, data);
1110         params.failed = false;
1111         params.persistent = persistent;
1112
1113         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1114                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1115                 talloc_free(params.recdata);
1116                 talloc_free(tmp_ctx);
1117                 return -1;
1118         }
1119
1120         if (params.failed) {
1121                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1122                 talloc_free(params.recdata);
1123                 talloc_free(tmp_ctx);
1124                 return -1;              
1125         }
1126
1127         recdata = params.recdata;
1128
1129         outdata.dptr = (void *)recdata;
1130         outdata.dsize = params.len;
1131
1132         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1133         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1134                                         nodes, 0,
1135                                         CONTROL_TIMEOUT(), false, outdata,
1136                                         NULL, NULL,
1137                                         NULL) != 0) {
1138                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1139                 talloc_free(recdata);
1140                 talloc_free(tmp_ctx);
1141                 return -1;
1142         }
1143
1144         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1145                   dbid, recdata->count));
1146
1147         talloc_free(recdata);
1148         talloc_free(tmp_ctx);
1149
1150         return 0;
1151 }
1152
1153
1154 /*
1155   go through a full recovery on one database 
1156  */
1157 static int recover_database(struct ctdb_recoverd *rec, 
1158                             TALLOC_CTX *mem_ctx,
1159                             uint32_t dbid,
1160                             bool persistent,
1161                             uint32_t pnn, 
1162                             struct ctdb_node_map *nodemap,
1163                             uint32_t transaction_id)
1164 {
1165         struct tdb_wrap *recdb;
1166         int ret;
1167         struct ctdb_context *ctdb = rec->ctdb;
1168         TDB_DATA data;
1169         struct ctdb_control_wipe_database w;
1170         uint32_t *nodes;
1171
1172         recdb = create_recdb(ctdb, mem_ctx);
1173         if (recdb == NULL) {
1174                 return -1;
1175         }
1176
1177         /* pull all remote databases onto the recdb */
1178         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1179         if (ret != 0) {
1180                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1181                 return -1;
1182         }
1183
1184         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1185
1186         /* wipe all the remote databases. This is safe as we are in a transaction */
1187         w.db_id = dbid;
1188         w.transaction_id = transaction_id;
1189
1190         data.dptr = (void *)&w;
1191         data.dsize = sizeof(w);
1192
1193         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1194         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1195                                         nodes, 0,
1196                                         CONTROL_TIMEOUT(), false, data,
1197                                         NULL, NULL,
1198                                         NULL) != 0) {
1199                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1200                 talloc_free(recdb);
1201                 return -1;
1202         }
1203         
1204         /* push out the correct database. This sets the dmaster and skips 
1205            the empty records */
1206         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1207         if (ret != 0) {
1208                 talloc_free(recdb);
1209                 return -1;
1210         }
1211
1212         /* all done with this database */
1213         talloc_free(recdb);
1214
1215         return 0;
1216 }
1217
1218 /*
1219   reload the nodes file 
1220 */
1221 static void reload_nodes_file(struct ctdb_context *ctdb)
1222 {
1223         ctdb->nodes = NULL;
1224         ctdb_load_nodes_file(ctdb);
1225 }
1226
1227         
1228 /*
1229   we are the recmaster, and recovery is needed - start a recovery run
1230  */
1231 static int do_recovery(struct ctdb_recoverd *rec, 
1232                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1233                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1234 {
1235         struct ctdb_context *ctdb = rec->ctdb;
1236         int i, j, ret;
1237         uint32_t generation;
1238         struct ctdb_dbid_map *dbmap;
1239         TDB_DATA data;
1240         uint32_t *nodes;
1241         struct timeval start_time;
1242
1243         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1244
1245         /* if recovery fails, force it again */
1246         rec->need_recovery = true;
1247
1248         for (i=0; i<ctdb->num_nodes; i++) {
1249                 struct ctdb_banning_state *ban_state;
1250
1251                 if (ctdb->nodes[i]->ban_state == NULL) {
1252                         continue;
1253                 }
1254                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1255                 if (ban_state->count < 2*ctdb->num_nodes) {
1256                         continue;
1257                 }
1258                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1259                         ctdb->nodes[i]->pnn, ban_state->count,
1260                         ctdb->tunable.recovery_ban_period));
1261                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1262                 ban_state->count = 0;
1263         }
1264
1265
1266         if (ctdb->tunable.verify_recovery_lock != 0) {
1267                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1268                 start_time = timeval_current();
1269                 if (!ctdb_recovery_lock(ctdb, true)) {
1270                         ctdb_set_culprit(rec, pnn);
1271                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1272                         return -1;
1273                 }
1274                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1275                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1276         }
1277
1278         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1279
1280         /* get a list of all databases */
1281         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1282         if (ret != 0) {
1283                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1284                 return -1;
1285         }
1286
1287         /* we do the db creation before we set the recovery mode, so the freeze happens
1288            on all databases we will be dealing with. */
1289
1290         /* verify that we have all the databases any other node has */
1291         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1292         if (ret != 0) {
1293                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1294                 return -1;
1295         }
1296
1297         /* verify that all other nodes have all our databases */
1298         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1299         if (ret != 0) {
1300                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1301                 return -1;
1302         }
1303         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1304
1305         /* update the database priority for all remote databases */
1306         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1307         if (ret != 0) {
1308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1309         }
1310         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1311
1312
1313         /* set recovery mode to active on all nodes */
1314         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1315         if (ret != 0) {
1316                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1317                 return -1;
1318         }
1319
1320         /* execute the "startrecovery" event script on all nodes */
1321         ret = run_startrecovery_eventscript(rec, nodemap);
1322         if (ret!=0) {
1323                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1324                 return -1;
1325         }
1326
1327         /* pick a new generation number */
1328         generation = new_generation();
1329
1330         /* change the vnnmap on this node to use the new generation 
1331            number but not on any other nodes.
1332            this guarantees that if we abort the recovery prematurely
1333            for some reason (a node stops responding?)
1334            that we can just return immediately and we will reenter
1335            recovery shortly again.
1336            I.e. we deliberately leave the cluster with an inconsistent
1337            generation id to allow us to abort recovery at any stage and
1338            just restart it from scratch.
1339          */
1340         vnnmap->generation = generation;
1341         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1342         if (ret != 0) {
1343                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1344                 return -1;
1345         }
1346
1347         data.dptr = (void *)&generation;
1348         data.dsize = sizeof(uint32_t);
1349
1350         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1351         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1352                                         nodes, 0,
1353                                         CONTROL_TIMEOUT(), false, data,
1354                                         NULL,
1355                                         transaction_start_fail_callback,
1356                                         rec) != 0) {
1357                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1358                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1359                                         nodes, 0,
1360                                         CONTROL_TIMEOUT(), false, tdb_null,
1361                                         NULL,
1362                                         NULL,
1363                                         NULL) != 0) {
1364                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1365                 }
1366                 return -1;
1367         }
1368
1369         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1370
1371         for (i=0;i<dbmap->num;i++) {
1372                 ret = recover_database(rec, mem_ctx,
1373                                        dbmap->dbs[i].dbid,
1374                                        dbmap->dbs[i].persistent,
1375                                        pnn, nodemap, generation);
1376                 if (ret != 0) {
1377                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1378                         return -1;
1379                 }
1380         }
1381
1382         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1383
1384         /* commit all the changes */
1385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1386                                         nodes, 0,
1387                                         CONTROL_TIMEOUT(), false, data,
1388                                         NULL, NULL,
1389                                         NULL) != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1391                 return -1;
1392         }
1393
1394         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1395         
1396
1397         /* update the capabilities for all nodes */
1398         ret = update_capabilities(ctdb, nodemap);
1399         if (ret!=0) {
1400                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1401                 return -1;
1402         }
1403
1404         /* build a new vnn map with all the currently active and
1405            unbanned nodes */
1406         generation = new_generation();
1407         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1408         CTDB_NO_MEMORY(ctdb, vnnmap);
1409         vnnmap->generation = generation;
1410         vnnmap->size = 0;
1411         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1412         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1413         for (i=j=0;i<nodemap->num;i++) {
1414                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1415                         continue;
1416                 }
1417                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1418                         /* this node can not be an lmaster */
1419                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1420                         continue;
1421                 }
1422
1423                 vnnmap->size++;
1424                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1425                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1426                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1427
1428         }
1429         if (vnnmap->size == 0) {
1430                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1431                 vnnmap->size++;
1432                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1433                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1434                 vnnmap->map[0] = pnn;
1435         }       
1436
1437         /* update to the new vnnmap on all nodes */
1438         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1439         if (ret != 0) {
1440                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1441                 return -1;
1442         }
1443
1444         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1445
1446         /* update recmaster to point to us for all nodes */
1447         ret = set_recovery_master(ctdb, nodemap, pnn);
1448         if (ret!=0) {
1449                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1450                 return -1;
1451         }
1452
1453         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1454
1455         /*
1456           update all nodes to have the same flags that we have
1457          */
1458         for (i=0;i<nodemap->num;i++) {
1459                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1460                         continue;
1461                 }
1462
1463                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1464                 if (ret != 0) {
1465                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1466                         return -1;
1467                 }
1468         }
1469
1470         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1471
1472         /* disable recovery mode */
1473         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1474         if (ret != 0) {
1475                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1476                 return -1;
1477         }
1478
1479         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1480
1481         /*
1482           tell nodes to takeover their public IPs
1483          */
1484         rec->need_takeover_run = false;
1485         ret = ctdb_takeover_run(ctdb, nodemap);
1486         if (ret != 0) {
1487                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1488                 return -1;
1489         }
1490         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1491
1492         /* execute the "recovered" event script on all nodes */
1493         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1494         if (ret!=0) {
1495                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1496                 return -1;
1497         }
1498
1499         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1500
1501         /* send a message to all clients telling them that the cluster 
1502            has been reconfigured */
1503         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1504
1505         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1506
1507         rec->need_recovery = false;
1508
1509         /* we managed to complete a full recovery, make sure to forgive
1510            any past sins by the nodes that could now participate in the
1511            recovery.
1512         */
1513         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1514         for (i=0;i<nodemap->num;i++) {
1515                 struct ctdb_banning_state *ban_state;
1516
1517                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1518                         continue;
1519                 }
1520
1521                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1522                 if (ban_state == NULL) {
1523                         continue;
1524                 }
1525
1526                 ban_state->count = 0;
1527         }
1528
1529
1530         /* We just finished a recovery successfully. 
1531            We now wait for rerecovery_timeout before we allow 
1532            another recovery to take place.
1533         */
1534         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1535         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1536         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1537
1538         return 0;
1539 }
1540
1541
1542 /*
1543   elections are won by first checking the number of connected nodes, then
1544   the priority time, then the pnn
1545  */
1546 struct election_message {
1547         uint32_t num_connected;
1548         struct timeval priority_time;
1549         uint32_t pnn;
1550         uint32_t node_flags;
1551 };
1552
1553 /*
1554   form this nodes election data
1555  */
1556 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1557 {
1558         int ret, i;
1559         struct ctdb_node_map *nodemap;
1560         struct ctdb_context *ctdb = rec->ctdb;
1561
1562         ZERO_STRUCTP(em);
1563
1564         em->pnn = rec->ctdb->pnn;
1565         em->priority_time = rec->priority_time;
1566
1567         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1570                 return;
1571         }
1572
1573         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1574         em->node_flags = rec->node_flags;
1575
1576         for (i=0;i<nodemap->num;i++) {
1577                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1578                         em->num_connected++;
1579                 }
1580         }
1581
1582         /* we shouldnt try to win this election if we cant be a recmaster */
1583         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1584                 em->num_connected = 0;
1585                 em->priority_time = timeval_current();
1586         }
1587
1588         talloc_free(nodemap);
1589 }
1590
1591 /*
1592   see if the given election data wins
1593  */
1594 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1595 {
1596         struct election_message myem;
1597         int cmp = 0;
1598
1599         ctdb_election_data(rec, &myem);
1600
1601         /* we cant win if we dont have the recmaster capability */
1602         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1603                 return false;
1604         }
1605
1606         /* we cant win if we are banned */
1607         if (rec->node_flags & NODE_FLAGS_BANNED) {
1608                 return false;
1609         }       
1610
1611         /* we cant win if we are stopped */
1612         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1613                 return false;
1614         }       
1615
1616         /* we will automatically win if the other node is banned */
1617         if (em->node_flags & NODE_FLAGS_BANNED) {
1618                 return true;
1619         }
1620
1621         /* we will automatically win if the other node is banned */
1622         if (em->node_flags & NODE_FLAGS_STOPPED) {
1623                 return true;
1624         }
1625
1626         /* try to use the most connected node */
1627         if (cmp == 0) {
1628                 cmp = (int)myem.num_connected - (int)em->num_connected;
1629         }
1630
1631         /* then the longest running node */
1632         if (cmp == 0) {
1633                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1634         }
1635
1636         if (cmp == 0) {
1637                 cmp = (int)myem.pnn - (int)em->pnn;
1638         }
1639
1640         return cmp > 0;
1641 }
1642
1643 /*
1644   send out an election request
1645  */
1646 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1647 {
1648         int ret;
1649         TDB_DATA election_data;
1650         struct election_message emsg;
1651         uint64_t srvid;
1652         struct ctdb_context *ctdb = rec->ctdb;
1653
1654         srvid = CTDB_SRVID_RECOVERY;
1655
1656         ctdb_election_data(rec, &emsg);
1657
1658         election_data.dsize = sizeof(struct election_message);
1659         election_data.dptr  = (unsigned char *)&emsg;
1660
1661
1662         /* send an election message to all active nodes */
1663         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1664         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1665
1666
1667         /* A new node that is already frozen has entered the cluster.
1668            The existing nodes are not frozen and dont need to be frozen
1669            until the election has ended and we start the actual recovery
1670         */
1671         if (update_recmaster == true) {
1672                 /* first we assume we will win the election and set 
1673                    recoverymaster to be ourself on the current node
1674                  */
1675                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1676                 if (ret != 0) {
1677                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1678                         return -1;
1679                 }
1680         }
1681
1682
1683         return 0;
1684 }
1685
1686 /*
1687   this function will unban all nodes in the cluster
1688 */
1689 static void unban_all_nodes(struct ctdb_context *ctdb)
1690 {
1691         int ret, i;
1692         struct ctdb_node_map *nodemap;
1693         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1694         
1695         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1696         if (ret != 0) {
1697                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1698                 return;
1699         }
1700
1701         for (i=0;i<nodemap->num;i++) {
1702                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1703                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1704                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1705                 }
1706         }
1707
1708         talloc_free(tmp_ctx);
1709 }
1710
1711
1712 /*
1713   we think we are winning the election - send a broadcast election request
1714  */
1715 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1716 {
1717         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1718         int ret;
1719
1720         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1721         if (ret != 0) {
1722                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1723         }
1724
1725         talloc_free(rec->send_election_te);
1726         rec->send_election_te = NULL;
1727 }
1728
1729 /*
1730   handler for memory dumps
1731 */
1732 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1733                              TDB_DATA data, void *private_data)
1734 {
1735         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1736         TDB_DATA *dump;
1737         int ret;
1738         struct rd_memdump_reply *rd;
1739
1740         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1741                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1742                 talloc_free(tmp_ctx);
1743                 return;
1744         }
1745         rd = (struct rd_memdump_reply *)data.dptr;
1746
1747         dump = talloc_zero(tmp_ctx, TDB_DATA);
1748         if (dump == NULL) {
1749                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1750                 talloc_free(tmp_ctx);
1751                 return;
1752         }
1753         ret = ctdb_dump_memory(ctdb, dump);
1754         if (ret != 0) {
1755                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1756                 talloc_free(tmp_ctx);
1757                 return;
1758         }
1759
1760 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1761
1762         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1765                 talloc_free(tmp_ctx);
1766                 return;
1767         }
1768
1769         talloc_free(tmp_ctx);
1770 }
1771
1772 /*
1773   handler for reload_nodes
1774 */
1775 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1776                              TDB_DATA data, void *private_data)
1777 {
1778         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1779
1780         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1781
1782         reload_nodes_file(rec->ctdb);
1783 }
1784
1785
1786 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1787                               struct timeval yt, void *p)
1788 {
1789         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1790
1791         talloc_free(rec->ip_check_disable_ctx);
1792         rec->ip_check_disable_ctx = NULL;
1793 }
1794
1795 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1796                              TDB_DATA data, void *private_data)
1797 {
1798         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1799         uint32_t timeout;
1800
1801         if (rec->ip_check_disable_ctx != NULL) {
1802                 talloc_free(rec->ip_check_disable_ctx);
1803                 rec->ip_check_disable_ctx = NULL;
1804         }
1805
1806         if (data.dsize != sizeof(uint32_t)) {
1807                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1808                                  "expexting %lu\n", (long unsigned)data.dsize,
1809                                  (long unsigned)sizeof(uint32_t)));
1810                 return;
1811         }
1812         if (data.dptr == NULL) {
1813                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1814                 return;
1815         }
1816
1817         timeout = *((uint32_t *)data.dptr);
1818         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1819
1820         rec->ip_check_disable_ctx = talloc_new(rec);
1821         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1822
1823         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1824 }
1825
1826
1827 /*
1828   handler for ip reallocate, just add it to the list of callers and 
1829   handle this later in the monitor_cluster loop so we do not recurse
1830   with other callers to takeover_run()
1831 */
1832 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1833                              TDB_DATA data, void *private_data)
1834 {
1835         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1836         struct ip_reallocate_list *caller;
1837
1838         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1839                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1840                 return;
1841         }
1842
1843         if (rec->ip_reallocate_ctx == NULL) {
1844                 rec->ip_reallocate_ctx = talloc_new(rec);
1845                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1846         }
1847
1848         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1849         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1850
1851         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1852         caller->next = rec->reallocate_callers;
1853         rec->reallocate_callers = caller;
1854
1855         return;
1856 }
1857
1858 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1859 {
1860         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1861         TDB_DATA result;
1862         int32_t ret;
1863         struct ip_reallocate_list *callers;
1864
1865         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1866         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1867         result.dsize = sizeof(int32_t);
1868         result.dptr  = (uint8_t *)&ret;
1869
1870         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1871
1872                 /* Someone that sent srvid==0 does not want a reply */
1873                 if (callers->rd->srvid == 0) {
1874                         continue;
1875                 }
1876                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1877                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1878                                   (unsigned long long)callers->rd->srvid));
1879                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1880                 if (ret != 0) {
1881                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1882                                          "message to %u:%llu\n",
1883                                          (unsigned)callers->rd->pnn,
1884                                          (unsigned long long)callers->rd->srvid));
1885                 }
1886         }
1887
1888         talloc_free(tmp_ctx);
1889         talloc_free(rec->ip_reallocate_ctx);
1890         rec->ip_reallocate_ctx = NULL;
1891         rec->reallocate_callers = NULL;
1892         
1893 }
1894
1895
1896 /*
1897   handler for recovery master elections
1898 */
1899 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1900                              TDB_DATA data, void *private_data)
1901 {
1902         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1903         int ret;
1904         struct election_message *em = (struct election_message *)data.dptr;
1905         TALLOC_CTX *mem_ctx;
1906
1907         /* we got an election packet - update the timeout for the election */
1908         talloc_free(rec->election_timeout);
1909         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1910                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1911                                                 ctdb_election_timeout, rec);
1912
1913         mem_ctx = talloc_new(ctdb);
1914
1915         /* someone called an election. check their election data
1916            and if we disagree and we would rather be the elected node, 
1917            send a new election message to all other nodes
1918          */
1919         if (ctdb_election_win(rec, em)) {
1920                 if (!rec->send_election_te) {
1921                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1922                                                                 timeval_current_ofs(0, 500000),
1923                                                                 election_send_request, rec);
1924                 }
1925                 talloc_free(mem_ctx);
1926                 /*unban_all_nodes(ctdb);*/
1927                 return;
1928         }
1929         
1930         /* we didn't win */
1931         talloc_free(rec->send_election_te);
1932         rec->send_election_te = NULL;
1933
1934         if (ctdb->tunable.verify_recovery_lock != 0) {
1935                 /* release the recmaster lock */
1936                 if (em->pnn != ctdb->pnn &&
1937                     ctdb->recovery_lock_fd != -1) {
1938                         close(ctdb->recovery_lock_fd);
1939                         ctdb->recovery_lock_fd = -1;
1940                         unban_all_nodes(ctdb);
1941                 }
1942         }
1943
1944         /* ok, let that guy become recmaster then */
1945         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1946         if (ret != 0) {
1947                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1948                 talloc_free(mem_ctx);
1949                 return;
1950         }
1951
1952         talloc_free(mem_ctx);
1953         return;
1954 }
1955
1956
1957 /*
1958   force the start of the election process
1959  */
1960 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1961                            struct ctdb_node_map *nodemap)
1962 {
1963         int ret;
1964         struct ctdb_context *ctdb = rec->ctdb;
1965
1966         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1967
1968         /* set all nodes to recovery mode to stop all internode traffic */
1969         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1970         if (ret != 0) {
1971                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1972                 return;
1973         }
1974
1975         talloc_free(rec->election_timeout);
1976         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1977                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1978                                                 ctdb_election_timeout, rec);
1979
1980         ret = send_election_request(rec, pnn, true);
1981         if (ret!=0) {
1982                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1983                 return;
1984         }
1985
1986         /* wait for a few seconds to collect all responses */
1987         ctdb_wait_election(rec);
1988 }
1989
1990
1991
1992 /*
1993   handler for when a node changes its flags
1994 */
1995 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1996                             TDB_DATA data, void *private_data)
1997 {
1998         int ret;
1999         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2000         struct ctdb_node_map *nodemap=NULL;
2001         TALLOC_CTX *tmp_ctx;
2002         uint32_t changed_flags;
2003         int i;
2004         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2005         int disabled_flag_changed;
2006
2007         if (data.dsize != sizeof(*c)) {
2008                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2009                 return;
2010         }
2011
2012         tmp_ctx = talloc_new(ctdb);
2013         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2014
2015         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2016         if (ret != 0) {
2017                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2018                 talloc_free(tmp_ctx);
2019                 return;         
2020         }
2021
2022
2023         for (i=0;i<nodemap->num;i++) {
2024                 if (nodemap->nodes[i].pnn == c->pnn) break;
2025         }
2026
2027         if (i == nodemap->num) {
2028                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2029                 talloc_free(tmp_ctx);
2030                 return;
2031         }
2032
2033         changed_flags = c->old_flags ^ c->new_flags;
2034
2035         if (nodemap->nodes[i].flags != c->new_flags) {
2036                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2037         }
2038
2039         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2040
2041         nodemap->nodes[i].flags = c->new_flags;
2042
2043         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2044                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2045
2046         if (ret == 0) {
2047                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2048                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2049         }
2050         
2051         if (ret == 0 &&
2052             ctdb->recovery_master == ctdb->pnn &&
2053             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2054                 /* Only do the takeover run if the perm disabled or unhealthy
2055                    flags changed since these will cause an ip failover but not
2056                    a recovery.
2057                    If the node became disconnected or banned this will also
2058                    lead to an ip address failover but that is handled 
2059                    during recovery
2060                 */
2061                 if (disabled_flag_changed) {
2062                         rec->need_takeover_run = true;
2063                 }
2064         }
2065
2066         talloc_free(tmp_ctx);
2067 }
2068
2069 /*
2070   handler for when we need to push out flag changes ot all other nodes
2071 */
2072 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2073                             TDB_DATA data, void *private_data)
2074 {
2075         int ret;
2076         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2077         struct ctdb_node_map *nodemap=NULL;
2078         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2079         uint32_t recmaster;
2080         uint32_t *nodes;
2081
2082         /* find the recovery master */
2083         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2084         if (ret != 0) {
2085                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2086                 talloc_free(tmp_ctx);
2087                 return;
2088         }
2089
2090         /* read the node flags from the recmaster */
2091         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2094                 talloc_free(tmp_ctx);
2095                 return;
2096         }
2097         if (c->pnn >= nodemap->num) {
2098                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2099                 talloc_free(tmp_ctx);
2100                 return;
2101         }
2102
2103         /* send the flags update to all connected nodes */
2104         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2105
2106         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2107                                       nodes, 0, CONTROL_TIMEOUT(),
2108                                       false, data,
2109                                       NULL, NULL,
2110                                       NULL) != 0) {
2111                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2112
2113                 talloc_free(tmp_ctx);
2114                 return;
2115         }
2116
2117         talloc_free(tmp_ctx);
2118 }
2119
2120
2121 struct verify_recmode_normal_data {
2122         uint32_t count;
2123         enum monitor_result status;
2124 };
2125
2126 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2127 {
2128         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2129
2130
2131         /* one more node has responded with recmode data*/
2132         rmdata->count--;
2133
2134         /* if we failed to get the recmode, then return an error and let
2135            the main loop try again.
2136         */
2137         if (state->state != CTDB_CONTROL_DONE) {
2138                 if (rmdata->status == MONITOR_OK) {
2139                         rmdata->status = MONITOR_FAILED;
2140                 }
2141                 return;
2142         }
2143
2144         /* if we got a response, then the recmode will be stored in the
2145            status field
2146         */
2147         if (state->status != CTDB_RECOVERY_NORMAL) {
2148                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2149                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2150         }
2151
2152         return;
2153 }
2154
2155
2156 /* verify that all nodes are in normal recovery mode */
2157 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2158 {
2159         struct verify_recmode_normal_data *rmdata;
2160         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2161         struct ctdb_client_control_state *state;
2162         enum monitor_result status;
2163         int j;
2164         
2165         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2166         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2167         rmdata->count  = 0;
2168         rmdata->status = MONITOR_OK;
2169
2170         /* loop over all active nodes and send an async getrecmode call to 
2171            them*/
2172         for (j=0; j<nodemap->num; j++) {
2173                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2174                         continue;
2175                 }
2176                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2177                                         CONTROL_TIMEOUT(), 
2178                                         nodemap->nodes[j].pnn);
2179                 if (state == NULL) {
2180                         /* we failed to send the control, treat this as 
2181                            an error and try again next iteration
2182                         */                      
2183                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2184                         talloc_free(mem_ctx);
2185                         return MONITOR_FAILED;
2186                 }
2187
2188                 /* set up the callback functions */
2189                 state->async.fn = verify_recmode_normal_callback;
2190                 state->async.private_data = rmdata;
2191
2192                 /* one more control to wait for to complete */
2193                 rmdata->count++;
2194         }
2195
2196
2197         /* now wait for up to the maximum number of seconds allowed
2198            or until all nodes we expect a response from has replied
2199         */
2200         while (rmdata->count > 0) {
2201                 event_loop_once(ctdb->ev);
2202         }
2203
2204         status = rmdata->status;
2205         talloc_free(mem_ctx);
2206         return status;
2207 }
2208
2209
2210 struct verify_recmaster_data {
2211         struct ctdb_recoverd *rec;
2212         uint32_t count;
2213         uint32_t pnn;
2214         enum monitor_result status;
2215 };
2216
2217 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2218 {
2219         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2220
2221
2222         /* one more node has responded with recmaster data*/
2223         rmdata->count--;
2224
2225         /* if we failed to get the recmaster, then return an error and let
2226            the main loop try again.
2227         */
2228         if (state->state != CTDB_CONTROL_DONE) {
2229                 if (rmdata->status == MONITOR_OK) {
2230                         rmdata->status = MONITOR_FAILED;
2231                 }
2232                 return;
2233         }
2234
2235         /* if we got a response, then the recmaster will be stored in the
2236            status field
2237         */
2238         if (state->status != rmdata->pnn) {
2239                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2240                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2241                 rmdata->status = MONITOR_ELECTION_NEEDED;
2242         }
2243
2244         return;
2245 }
2246
2247
2248 /* verify that all nodes agree that we are the recmaster */
2249 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2250 {
2251         struct ctdb_context *ctdb = rec->ctdb;
2252         struct verify_recmaster_data *rmdata;
2253         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2254         struct ctdb_client_control_state *state;
2255         enum monitor_result status;
2256         int j;
2257         
2258         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2259         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2260         rmdata->rec    = rec;
2261         rmdata->count  = 0;
2262         rmdata->pnn    = pnn;
2263         rmdata->status = MONITOR_OK;
2264
2265         /* loop over all active nodes and send an async getrecmaster call to 
2266            them*/
2267         for (j=0; j<nodemap->num; j++) {
2268                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2269                         continue;
2270                 }
2271                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2272                                         CONTROL_TIMEOUT(),
2273                                         nodemap->nodes[j].pnn);
2274                 if (state == NULL) {
2275                         /* we failed to send the control, treat this as 
2276                            an error and try again next iteration
2277                         */                      
2278                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2279                         talloc_free(mem_ctx);
2280                         return MONITOR_FAILED;
2281                 }
2282
2283                 /* set up the callback functions */
2284                 state->async.fn = verify_recmaster_callback;
2285                 state->async.private_data = rmdata;
2286
2287                 /* one more control to wait for to complete */
2288                 rmdata->count++;
2289         }
2290
2291
2292         /* now wait for up to the maximum number of seconds allowed
2293            or until all nodes we expect a response from has replied
2294         */
2295         while (rmdata->count > 0) {
2296                 event_loop_once(ctdb->ev);
2297         }
2298
2299         status = rmdata->status;
2300         talloc_free(mem_ctx);
2301         return status;
2302 }
2303
2304
2305 /* called to check that the allocation of public ip addresses is ok.
2306 */
2307 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2308 {
2309         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2310         struct ctdb_all_public_ips *ips = NULL;
2311         struct ctdb_uptime *uptime1 = NULL;
2312         struct ctdb_uptime *uptime2 = NULL;
2313         int ret, j;
2314
2315         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2316                                 CTDB_CURRENT_NODE, &uptime1);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2319                 talloc_free(mem_ctx);
2320                 return -1;
2321         }
2322
2323         /* read the ip allocation from the local node */
2324         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2325         if (ret != 0) {
2326                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2327                 talloc_free(mem_ctx);
2328                 return -1;
2329         }
2330
2331         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2332                                 CTDB_CURRENT_NODE, &uptime2);
2333         if (ret != 0) {
2334                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2335                 talloc_free(mem_ctx);
2336                 return -1;
2337         }
2338
2339         /* skip the check if the startrecovery time has changed */
2340         if (timeval_compare(&uptime1->last_recovery_started,
2341                             &uptime2->last_recovery_started) != 0) {
2342                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2343                 talloc_free(mem_ctx);
2344                 return 0;
2345         }
2346
2347         /* skip the check if the endrecovery time has changed */
2348         if (timeval_compare(&uptime1->last_recovery_finished,
2349                             &uptime2->last_recovery_finished) != 0) {
2350                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2351                 talloc_free(mem_ctx);
2352                 return 0;
2353         }
2354
2355         /* skip the check if we have started but not finished recovery */
2356         if (timeval_compare(&uptime1->last_recovery_finished,
2357                             &uptime1->last_recovery_started) != 1) {
2358                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2359                 talloc_free(mem_ctx);
2360
2361                 return 0;
2362         }
2363
2364         /* verify that we have the ip addresses we should have
2365            and we dont have ones we shouldnt have.
2366            if we find an inconsistency we set recmode to
2367            active on the local node and wait for the recmaster
2368            to do a full blown recovery
2369         */
2370         for (j=0; j<ips->num; j++) {
2371                 if (ips->ips[j].pnn == pnn) {
2372                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2373                                 struct takeover_run_reply rd;
2374                                 TDB_DATA data;
2375
2376                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2377                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2378
2379                                 rd.pnn   = ctdb->pnn;
2380                                 rd.srvid = 0;
2381                                 data.dptr = (uint8_t *)&rd;
2382                                 data.dsize = sizeof(rd);
2383
2384                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2385                                 if (ret != 0) {
2386                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2387                                 }
2388                         }
2389                 } else {
2390                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2391                                 struct takeover_run_reply rd;
2392                                 TDB_DATA data;
2393
2394                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2395                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2396
2397                                 rd.pnn   = ctdb->pnn;
2398                                 rd.srvid = 0;
2399                                 data.dptr = (uint8_t *)&rd;
2400                                 data.dsize = sizeof(rd);
2401
2402                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2403                                 if (ret != 0) {
2404                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2405                                 }
2406                         }
2407                 }
2408         }
2409
2410         talloc_free(mem_ctx);
2411         return 0;
2412 }
2413
2414
2415 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2416 {
2417         struct ctdb_node_map **remote_nodemaps = callback_data;
2418
2419         if (node_pnn >= ctdb->num_nodes) {
2420                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2421                 return;
2422         }
2423
2424         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2425
2426 }
2427
2428 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2429         struct ctdb_node_map *nodemap,
2430         struct ctdb_node_map **remote_nodemaps)
2431 {
2432         uint32_t *nodes;
2433
2434         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2435         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2436                                         nodes, 0,
2437                                         CONTROL_TIMEOUT(), false, tdb_null,
2438                                         async_getnodemap_callback,
2439                                         NULL,
2440                                         remote_nodemaps) != 0) {
2441                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2442
2443                 return -1;
2444         }
2445
2446         return 0;
2447 }
2448
2449 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2450 struct ctdb_check_reclock_state {
2451         struct ctdb_context *ctdb;
2452         struct timeval start_time;
2453         int fd[2];
2454         pid_t child;
2455         struct timed_event *te;
2456         struct fd_event *fde;
2457         enum reclock_child_status status;
2458 };
2459
2460 /* when we free the reclock state we must kill any child process.
2461 */
2462 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2463 {
2464         struct ctdb_context *ctdb = state->ctdb;
2465
2466         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2467
2468         if (state->fd[0] != -1) {
2469                 close(state->fd[0]);
2470                 state->fd[0] = -1;
2471         }
2472         if (state->fd[1] != -1) {
2473                 close(state->fd[1]);
2474                 state->fd[1] = -1;
2475         }
2476         kill(state->child, SIGKILL);
2477         return 0;
2478 }
2479
2480 /*
2481   called if our check_reclock child times out. this would happen if
2482   i/o to the reclock file blocks.
2483  */
2484 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2485                                          struct timeval t, void *private_data)
2486 {
2487         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2488                                            struct ctdb_check_reclock_state);
2489
2490         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2491         state->status = RECLOCK_TIMEOUT;
2492 }
2493
2494 /* this is called when the child process has completed checking the reclock
2495    file and has written data back to us through the pipe.
2496 */
2497 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2498                              uint16_t flags, void *private_data)
2499 {
2500         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2501                                              struct ctdb_check_reclock_state);
2502         char c = 0;
2503         int ret;
2504
2505         /* we got a response from our child process so we can abort the
2506            timeout.
2507         */
2508         talloc_free(state->te);
2509         state->te = NULL;
2510
2511         ret = read(state->fd[0], &c, 1);
2512         if (ret != 1 || c != RECLOCK_OK) {
2513                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2514                 state->status = RECLOCK_FAILED;
2515
2516                 return;
2517         }
2518
2519         state->status = RECLOCK_OK;
2520         return;
2521 }
2522
2523 static int check_recovery_lock(struct ctdb_context *ctdb)
2524 {
2525         int ret;
2526         struct ctdb_check_reclock_state *state;
2527         pid_t parent = getpid();
2528
2529         if (ctdb->recovery_lock_fd == -1) {
2530                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2531                 return -1;
2532         }
2533
2534         state = talloc(ctdb, struct ctdb_check_reclock_state);
2535         CTDB_NO_MEMORY(ctdb, state);
2536
2537         state->ctdb = ctdb;
2538         state->start_time = timeval_current();
2539         state->status = RECLOCK_CHECKING;
2540         state->fd[0] = -1;
2541         state->fd[1] = -1;
2542
2543         ret = pipe(state->fd);
2544         if (ret != 0) {
2545                 talloc_free(state);
2546                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2547                 return -1;
2548         }
2549
2550         state->child = fork();
2551         if (state->child == (pid_t)-1) {
2552                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2553                 close(state->fd[0]);
2554                 state->fd[0] = -1;
2555                 close(state->fd[1]);
2556                 state->fd[1] = -1;
2557                 talloc_free(state);
2558                 return -1;
2559         }
2560
2561         if (state->child == 0) {
2562                 char cc = RECLOCK_OK;
2563                 close(state->fd[0]);
2564                 state->fd[0] = -1;
2565
2566                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2567                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2568                         cc = RECLOCK_FAILED;
2569                 }
2570
2571                 write(state->fd[1], &cc, 1);
2572                 /* make sure we die when our parent dies */
2573                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2574                         sleep(5);
2575                         write(state->fd[1], &cc, 1);
2576                 }
2577                 _exit(0);
2578         }
2579         close(state->fd[1]);
2580         state->fd[1] = -1;
2581         set_close_on_exec(state->fd[0]);
2582
2583         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2584
2585         talloc_set_destructor(state, check_reclock_destructor);
2586
2587         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2588                                     ctdb_check_reclock_timeout, state);
2589         if (state->te == NULL) {
2590                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2591                 talloc_free(state);
2592                 return -1;
2593         }
2594
2595         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2596                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2597                                 reclock_child_handler,
2598                                 (void *)state);
2599
2600         if (state->fde == NULL) {
2601                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2602                 talloc_free(state);
2603                 return -1;
2604         }
2605
2606         while (state->status == RECLOCK_CHECKING) {
2607                 event_loop_once(ctdb->ev);
2608         }
2609
2610         if (state->status == RECLOCK_FAILED) {
2611                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2612                 close(ctdb->recovery_lock_fd);
2613                 ctdb->recovery_lock_fd = -1;
2614                 talloc_free(state);
2615                 return -1;
2616         }
2617
2618         talloc_free(state);
2619         return 0;
2620 }
2621
2622 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2623 {
2624         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2625         const char *reclockfile;
2626
2627         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2628                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2629                 talloc_free(tmp_ctx);
2630                 return -1;      
2631         }
2632
2633         if (reclockfile == NULL) {
2634                 if (ctdb->recovery_lock_file != NULL) {
2635                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2636                         talloc_free(ctdb->recovery_lock_file);
2637                         ctdb->recovery_lock_file = NULL;
2638                         if (ctdb->recovery_lock_fd != -1) {
2639                                 close(ctdb->recovery_lock_fd);
2640                                 ctdb->recovery_lock_fd = -1;
2641                         }
2642                 }
2643                 ctdb->tunable.verify_recovery_lock = 0;
2644                 talloc_free(tmp_ctx);
2645                 return 0;
2646         }
2647
2648         if (ctdb->recovery_lock_file == NULL) {
2649                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2650                 if (ctdb->recovery_lock_fd != -1) {
2651                         close(ctdb->recovery_lock_fd);
2652                         ctdb->recovery_lock_fd = -1;
2653                 }
2654                 talloc_free(tmp_ctx);
2655                 return 0;
2656         }
2657
2658
2659         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2660                 talloc_free(tmp_ctx);
2661                 return 0;
2662         }
2663
2664         talloc_free(ctdb->recovery_lock_file);
2665         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2666         ctdb->tunable.verify_recovery_lock = 0;
2667         if (ctdb->recovery_lock_fd != -1) {
2668                 close(ctdb->recovery_lock_fd);
2669                 ctdb->recovery_lock_fd = -1;
2670         }
2671
2672         talloc_free(tmp_ctx);
2673         return 0;
2674 }
2675                 
2676 /*
2677   the main monitoring loop
2678  */
2679 static void monitor_cluster(struct ctdb_context *ctdb)
2680 {
2681         uint32_t pnn;
2682         TALLOC_CTX *mem_ctx=NULL;
2683         struct ctdb_node_map *nodemap=NULL;
2684         struct ctdb_node_map *recmaster_nodemap=NULL;
2685         struct ctdb_node_map **remote_nodemaps=NULL;
2686         struct ctdb_vnn_map *vnnmap=NULL;
2687         struct ctdb_vnn_map *remote_vnnmap=NULL;
2688         int32_t debug_level;
2689         int i, j, ret;
2690         struct ctdb_recoverd *rec;
2691
2692         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2693
2694         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2695         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2696
2697         rec->ctdb = ctdb;
2698
2699         rec->priority_time = timeval_current();
2700
2701         /* register a message port for sending memory dumps */
2702         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2703
2704         /* register a message port for recovery elections */
2705         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2706
2707         /* when nodes are disabled/enabled */
2708         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2709
2710         /* when we are asked to puch out a flag change */
2711         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2712
2713         /* register a message port for vacuum fetch */
2714         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2715
2716         /* register a message port for reloadnodes  */
2717         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2718
2719         /* register a message port for performing a takeover run */
2720         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2721
2722         /* register a message port for disabling the ip check for a short while */
2723         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2724
2725 again:
2726         if (mem_ctx) {
2727                 talloc_free(mem_ctx);
2728                 mem_ctx = NULL;
2729         }
2730         mem_ctx = talloc_new(ctdb);
2731         if (!mem_ctx) {
2732                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2733                 exit(-1);
2734         }
2735
2736         /* we only check for recovery once every second */
2737         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2738
2739         /* verify that the main daemon is still running */
2740         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2741                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2742                 exit(-1);
2743         }
2744
2745         /* ping the local daemon to tell it we are alive */
2746         ctdb_ctrl_recd_ping(ctdb);
2747
2748         if (rec->election_timeout) {
2749                 /* an election is in progress */
2750                 goto again;
2751         }
2752
2753         /* read the debug level from the parent and update locally */
2754         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2755         if (ret !=0) {
2756                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2757                 goto again;
2758         }
2759         LogLevel = debug_level;
2760
2761
2762         /* We must check if we need to ban a node here but we want to do this
2763            as early as possible so we dont wait until we have pulled the node
2764            map from the local node. thats why we have the hardcoded value 20
2765         */
2766         for (i=0; i<ctdb->num_nodes; i++) {
2767                 struct ctdb_banning_state *ban_state;
2768
2769                 if (ctdb->nodes[i]->ban_state == NULL) {
2770                         continue;
2771                 }
2772                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2773                 if (ban_state->count < 20) {
2774                         continue;
2775                 }
2776                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2777                         ctdb->nodes[i]->pnn, ban_state->count,
2778                         ctdb->tunable.recovery_ban_period));
2779                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2780                 ban_state->count = 0;
2781         }
2782
2783         /* get relevant tunables */
2784         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2785         if (ret != 0) {
2786                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2787                 goto again;
2788         }
2789
2790         /* get the current recovery lock file from the server */
2791         if (update_recovery_lock_file(ctdb) != 0) {
2792                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2793                 goto again;
2794         }
2795
2796         /* Make sure that if recovery lock verification becomes disabled when
2797            we close the file
2798         */
2799         if (ctdb->tunable.verify_recovery_lock == 0) {
2800                 if (ctdb->recovery_lock_fd != -1) {
2801                         close(ctdb->recovery_lock_fd);
2802                         ctdb->recovery_lock_fd = -1;
2803                 }
2804         }
2805
2806         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2807         if (pnn == (uint32_t)-1) {
2808                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2809                 goto again;
2810         }
2811
2812         /* get the vnnmap */
2813         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2814         if (ret != 0) {
2815                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2816                 goto again;
2817         }
2818
2819
2820         /* get number of nodes */
2821         if (rec->nodemap) {
2822                 talloc_free(rec->nodemap);
2823                 rec->nodemap = NULL;
2824                 nodemap=NULL;
2825         }
2826         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2827         if (ret != 0) {
2828                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2829                 goto again;
2830         }
2831         nodemap = rec->nodemap;
2832
2833         /* check which node is the recovery master */
2834         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2835         if (ret != 0) {
2836                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2837                 goto again;
2838         }
2839
2840         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2841         if (rec->recmaster != pnn) {
2842                 if (rec->ip_reallocate_ctx != NULL) {
2843                         talloc_free(rec->ip_reallocate_ctx);
2844                         rec->ip_reallocate_ctx = NULL;
2845                         rec->reallocate_callers = NULL;
2846                 }
2847         }
2848         /* if there are takeovers requested, perform it and notify the waiters */
2849         if (rec->reallocate_callers) {
2850                 process_ipreallocate_requests(ctdb, rec);
2851         }
2852
2853         if (rec->recmaster == (uint32_t)-1) {
2854                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2855                 force_election(rec, pnn, nodemap);
2856                 goto again;
2857         }
2858
2859
2860         /* if the local daemon is STOPPED, we verify that the databases are
2861            also frozen and thet the recmode is set to active 
2862         */
2863         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2864                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2865                 if (ret != 0) {
2866                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2867                 }
2868                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2869                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2870
2871                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2872                         if (ret != 0) {
2873                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2874                                 goto again;
2875                         }
2876                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2877                         if (ret != 0) {
2878                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2879
2880                                 goto again;
2881                         }
2882                         goto again;
2883                 }
2884         }
2885         /* If the local node is stopped, verify we are not the recmaster 
2886            and yield this role if so
2887         */
2888         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2889                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2890                 force_election(rec, pnn, nodemap);
2891                 goto again;
2892         }
2893         
2894         /* check that we (recovery daemon) and the local ctdb daemon
2895            agrees on whether we are banned or not
2896         */
2897 //qqq
2898
2899         /* remember our own node flags */
2900         rec->node_flags = nodemap->nodes[pnn].flags;
2901
2902         /* count how many active nodes there are */
2903         rec->num_active    = 0;
2904         rec->num_connected = 0;
2905         for (i=0; i<nodemap->num; i++) {
2906                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2907                         rec->num_active++;
2908                 }
2909                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2910                         rec->num_connected++;
2911                 }
2912         }
2913
2914
2915         /* verify that the recmaster node is still active */
2916         for (j=0; j<nodemap->num; j++) {
2917                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2918                         break;
2919                 }
2920         }
2921
2922         if (j == nodemap->num) {
2923                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2924                 force_election(rec, pnn, nodemap);
2925                 goto again;
2926         }
2927
2928         /* if recovery master is disconnected we must elect a new recmaster */
2929         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2930                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2931                 force_election(rec, pnn, nodemap);
2932                 goto again;
2933         }
2934
2935         /* grap the nodemap from the recovery master to check if it is banned */
2936         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2937                                    mem_ctx, &recmaster_nodemap);
2938         if (ret != 0) {
2939                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2940                           nodemap->nodes[j].pnn));
2941                 goto again;
2942         }
2943
2944
2945         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2946                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2947                 force_election(rec, pnn, nodemap);
2948                 goto again;
2949         }
2950
2951
2952         /* verify that we have all ip addresses we should have and we dont
2953          * have addresses we shouldnt have.
2954          */ 
2955         if (ctdb->do_checkpublicip) {
2956                 if (rec->ip_check_disable_ctx == NULL) {
2957                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
2958                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2959                         }
2960                 }
2961         }
2962
2963
2964         /* if we are not the recmaster then we do not need to check
2965            if recovery is needed
2966          */
2967         if (pnn != rec->recmaster) {
2968                 goto again;
2969         }
2970
2971
2972         /* ensure our local copies of flags are right */
2973         ret = update_local_flags(rec, nodemap);
2974         if (ret == MONITOR_ELECTION_NEEDED) {
2975                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2976                 force_election(rec, pnn, nodemap);
2977                 goto again;
2978         }
2979         if (ret != MONITOR_OK) {
2980                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2981                 goto again;
2982         }
2983
2984         /* update the list of public ips that a node can handle for
2985            all connected nodes
2986         */
2987         if (ctdb->num_nodes != nodemap->num) {
2988                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2989                 reload_nodes_file(ctdb);
2990                 goto again;
2991         }
2992         for (j=0; j<nodemap->num; j++) {
2993                 /* release any existing data */
2994                 if (ctdb->nodes[j]->public_ips) {
2995                         talloc_free(ctdb->nodes[j]->public_ips);
2996                         ctdb->nodes[j]->public_ips = NULL;
2997                 }
2998
2999                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3000                         continue;
3001                 }
3002
3003                 /* grab a new shiny list of public ips from the node */
3004                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
3005                         ctdb->nodes[j]->pnn, 
3006                         ctdb->nodes,
3007                         &ctdb->nodes[j]->public_ips)) {
3008                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
3009                                 ctdb->nodes[j]->pnn));
3010                         goto again;
3011                 }
3012         }
3013
3014
3015         /* verify that all active nodes agree that we are the recmaster */
3016         switch (verify_recmaster(rec, nodemap, pnn)) {
3017         case MONITOR_RECOVERY_NEEDED:
3018                 /* can not happen */
3019                 goto again;
3020         case MONITOR_ELECTION_NEEDED:
3021                 force_election(rec, pnn, nodemap);
3022                 goto again;
3023         case MONITOR_OK:
3024                 break;
3025         case MONITOR_FAILED:
3026                 goto again;
3027         }
3028
3029
3030         if (rec->need_recovery) {
3031                 /* a previous recovery didn't finish */
3032                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3033                 goto again;             
3034         }
3035
3036         /* verify that all active nodes are in normal mode 
3037            and not in recovery mode 
3038         */
3039         switch (verify_recmode(ctdb, nodemap)) {
3040         case MONITOR_RECOVERY_NEEDED:
3041                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3042                 goto again;
3043         case MONITOR_FAILED:
3044                 goto again;
3045         case MONITOR_ELECTION_NEEDED:
3046                 /* can not happen */
3047         case MONITOR_OK:
3048                 break;
3049         }
3050
3051
3052         if (ctdb->tunable.verify_recovery_lock != 0) {
3053                 /* we should have the reclock - check its not stale */
3054                 ret = check_recovery_lock(ctdb);
3055                 if (ret != 0) {
3056                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3057                         ctdb_set_culprit(rec, ctdb->pnn);
3058                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3059                         goto again;
3060                 }
3061         }
3062
3063         /* get the nodemap for all active remote nodes
3064          */
3065         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3066         if (remote_nodemaps == NULL) {
3067                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3068                 goto again;
3069         }
3070         for(i=0; i<nodemap->num; i++) {
3071                 remote_nodemaps[i] = NULL;
3072         }
3073         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3074                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3075                 goto again;
3076         } 
3077
3078         /* verify that all other nodes have the same nodemap as we have
3079         */
3080         for (j=0; j<nodemap->num; j++) {
3081                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3082                         continue;
3083                 }
3084
3085                 if (remote_nodemaps[j] == NULL) {
3086                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3087                         ctdb_set_culprit(rec, j);
3088
3089                         goto again;
3090                 }
3091
3092                 /* if the nodes disagree on how many nodes there are
3093                    then this is a good reason to try recovery
3094                  */
3095                 if (remote_nodemaps[j]->num != nodemap->num) {
3096                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3097                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3098                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3099                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3100                         goto again;
3101                 }
3102
3103                 /* if the nodes disagree on which nodes exist and are
3104                    active, then that is also a good reason to do recovery
3105                  */
3106                 for (i=0;i<nodemap->num;i++) {
3107                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3108                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3109                                           nodemap->nodes[j].pnn, i, 
3110                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3111                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3112                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3113                                             vnnmap);
3114                                 goto again;
3115                         }
3116                 }
3117
3118                 /* verify the flags are consistent
3119                 */
3120                 for (i=0; i<nodemap->num; i++) {
3121                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3122                                 continue;
3123                         }
3124                         
3125                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3126                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3127                                   nodemap->nodes[j].pnn, 
3128                                   nodemap->nodes[i].pnn, 
3129                                   remote_nodemaps[j]->nodes[i].flags,
3130                                   nodemap->nodes[j].flags));
3131                                 if (i == j) {
3132                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3133                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3134                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3135                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3136                                                     vnnmap);
3137                                         goto again;
3138                                 } else {
3139                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3140                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3141                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3142                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3143                                                     vnnmap);
3144                                         goto again;
3145                                 }
3146                         }
3147                 }
3148         }
3149
3150
3151         /* there better be the same number of lmasters in the vnn map
3152            as there are active nodes or we will have to do a recovery
3153          */
3154         if (vnnmap->size != rec->num_active) {
3155                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3156                           vnnmap->size, rec->num_active));
3157                 ctdb_set_culprit(rec, ctdb->pnn);
3158                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3159                 goto again;
3160         }
3161
3162         /* verify that all active nodes in the nodemap also exist in 
3163            the vnnmap.
3164          */
3165         for (j=0; j<nodemap->num; j++) {
3166                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3167                         continue;
3168                 }
3169                 if (nodemap->nodes[j].pnn == pnn) {
3170                         continue;
3171                 }
3172
3173                 for (i=0; i<vnnmap->size; i++) {
3174                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3175                                 break;
3176                         }
3177                 }
3178                 if (i == vnnmap->size) {
3179                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3180                                   nodemap->nodes[j].pnn));
3181                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3182                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3183                         goto again;
3184                 }
3185         }
3186
3187         
3188         /* verify that all other nodes have the same vnnmap
3189            and are from the same generation
3190          */
3191         for (j=0; j<nodemap->num; j++) {
3192                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3193                         continue;
3194                 }
3195                 if (nodemap->nodes[j].pnn == pnn) {
3196                         continue;
3197                 }
3198
3199                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3200                                           mem_ctx, &remote_vnnmap);
3201                 if (ret != 0) {
3202                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3203                                   nodemap->nodes[j].pnn));
3204                         goto again;
3205                 }
3206
3207                 /* verify the vnnmap generation is the same */
3208                 if (vnnmap->generation != remote_vnnmap->generation) {
3209                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3210                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3211                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3212                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3213                         goto again;
3214                 }
3215
3216                 /* verify the vnnmap size is the same */
3217                 if (vnnmap->size != remote_vnnmap->size) {
3218                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3219                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3220                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3221                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3222                         goto again;
3223                 }
3224
3225                 /* verify the vnnmap is the same */
3226                 for (i=0;i<vnnmap->size;i++) {
3227                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3228                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3229                                           nodemap->nodes[j].pnn));
3230                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3231                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3232                                             vnnmap);
3233                                 goto again;
3234                         }
3235                 }
3236         }
3237
3238         /* we might need to change who has what IP assigned */
3239         if (rec->need_takeover_run) {
3240                 rec->need_takeover_run = false;
3241
3242                 /* execute the "startrecovery" event script on all nodes */
3243                 ret = run_startrecovery_eventscript(rec, nodemap);
3244                 if (ret!=0) {
3245                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3246                         ctdb_set_culprit(rec, ctdb->pnn);
3247                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3248                 }
3249
3250                 ret = ctdb_takeover_run(ctdb, nodemap);
3251                 if (ret != 0) {
3252                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3253                         ctdb_set_culprit(rec, ctdb->pnn);
3254                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3255                 }
3256
3257                 /* execute the "recovered" event script on all nodes */
3258                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3259 #if 0
3260 // we cant check whether the event completed successfully
3261 // since this script WILL fail if the node is in recovery mode
3262 // and if that race happens, the code here would just cause a second
3263 // cascading recovery.
3264                 if (ret!=0) {
3265                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3266                         ctdb_set_culprit(rec, ctdb->pnn);
3267                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3268                 }
3269 #endif
3270         }
3271
3272
3273         goto again;
3274
3275 }
3276
3277 /*
3278   event handler for when the main ctdbd dies
3279  */
3280 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3281                                  uint16_t flags, void *private_data)
3282 {
3283         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3284         _exit(1);
3285 }
3286
3287 /*
3288   called regularly to verify that the recovery daemon is still running
3289  */
3290 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3291                               struct timeval yt, void *p)
3292 {
3293         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3294
3295         if (kill(ctdb->recoverd_pid, 0) != 0) {
3296                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3297
3298                 ctdb_stop_recoverd(ctdb);
3299                 ctdb_stop_keepalive(ctdb);
3300                 ctdb_stop_monitoring(ctdb);
3301                 ctdb_release_all_ips(ctdb);
3302                 if (ctdb->methods != NULL) {
3303                         ctdb->methods->shutdown(ctdb);
3304                 }
3305                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3306
3307                 exit(10);       
3308         }
3309
3310         event_add_timed(ctdb->ev, ctdb, 
3311                         timeval_current_ofs(30, 0),
3312                         ctdb_check_recd, ctdb);
3313 }
3314
3315 static void recd_sig_child_handler(struct event_context *ev,
3316         struct signal_event *se, int signum, int count,
3317         void *dont_care, 
3318         void *private_data)
3319 {
3320 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3321         int status;
3322         pid_t pid = -1;
3323
3324         while (pid != 0) {
3325                 pid = waitpid(-1, &status, WNOHANG);
3326                 if (pid == -1) {
3327                         if (errno != ECHILD) {
3328                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3329                         }
3330                         return;
3331                 }
3332                 if (pid > 0) {
3333                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3334                 }
3335         }
3336 }
3337
3338 /*
3339   startup the recovery daemon as a child of the main ctdb daemon
3340  */
3341 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3342 {
3343         int fd[2];
3344         struct signal_event *se;
3345
3346         if (pipe(fd) != 0) {
3347                 return -1;
3348         }
3349
3350         ctdb->ctdbd_pid = getpid();
3351
3352         ctdb->recoverd_pid = fork();
3353         if (ctdb->recoverd_pid == -1) {
3354                 return -1;
3355         }
3356         
3357         if (ctdb->recoverd_pid != 0) {
3358                 close(fd[0]);
3359                 event_add_timed(ctdb->ev, ctdb, 
3360                                 timeval_current_ofs(30, 0),
3361                                 ctdb_check_recd, ctdb);
3362                 return 0;
3363         }
3364
3365         close(fd[1]);
3366
3367         srandom(getpid() ^ time(NULL));
3368
3369         if (switch_from_server_to_client(ctdb) != 0) {
3370                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3371                 exit(1);
3372         }
3373
3374         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3375
3376         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3377                      ctdb_recoverd_parent, &fd[0]);     
3378
3379         /* set up a handler to pick up sigchld */
3380         se = event_add_signal(ctdb->ev, ctdb,
3381                                      SIGCHLD, 0,
3382                                      recd_sig_child_handler,
3383                                      ctdb);
3384         if (se == NULL) {
3385                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3386                 exit(1);
3387         }
3388
3389         monitor_cluster(ctdb);
3390
3391         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3392         return -1;
3393 }
3394
3395 /*
3396   shutdown the recovery daemon
3397  */
3398 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3399 {
3400         if (ctdb->recoverd_pid == 0) {
3401                 return;
3402         }
3403
3404         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3405         kill(ctdb->recoverd_pid, SIGTERM);
3406 }