Revert "Make fetch_locked more scalable"
[rusty/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
258 {
259         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
260
261         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
262         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
263 }
264
265 /*
266   change recovery mode on all nodes
267  */
268 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
269 {
270         TDB_DATA data;
271         uint32_t *nodes;
272         TALLOC_CTX *tmp_ctx;
273
274         tmp_ctx = talloc_new(ctdb);
275         CTDB_NO_MEMORY(ctdb, tmp_ctx);
276
277         /* freeze all nodes */
278         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
279         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
280                 int i;
281
282                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
283                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
284                                                 nodes, i,
285                                                 CONTROL_TIMEOUT(),
286                                                 false, tdb_null,
287                                                 NULL,
288                                                 set_recmode_fail_callback,
289                                                 rec) != 0) {
290                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
291                                 talloc_free(tmp_ctx);
292                                 return -1;
293                         }
294                 }
295         }
296
297
298         data.dsize = sizeof(uint32_t);
299         data.dptr = (unsigned char *)&rec_mode;
300
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(),
304                                         false, data,
305                                         NULL, NULL,
306                                         NULL) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 /*
317   change recovery master on all node
318  */
319 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
320 {
321         TDB_DATA data;
322         TALLOC_CTX *tmp_ctx;
323         uint32_t *nodes;
324
325         tmp_ctx = talloc_new(ctdb);
326         CTDB_NO_MEMORY(ctdb, tmp_ctx);
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&pnn;
330
331         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(), false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /* update all remote nodes to use the same db priority that we have
347    this can fail if the remove node has not yet been upgraded to 
348    support this function, so we always return success and never fail
349    a recovery if this call fails.
350 */
351 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
352         struct ctdb_node_map *nodemap, 
353         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
354 {
355         int db;
356         uint32_t *nodes;
357
358         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
359
360         /* step through all local databases */
361         for (db=0; db<dbmap->num;db++) {
362                 TDB_DATA data;
363                 struct ctdb_db_priority db_prio;
364                 int ret;
365
366                 db_prio.db_id     = dbmap->dbs[db].dbid;
367                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
370                         continue;
371                 }
372
373                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
374
375                 data.dptr  = (uint8_t *)&db_prio;
376                 data.dsize = sizeof(db_prio);
377
378                 if (ctdb_client_async_control(ctdb,
379                                         CTDB_CONTROL_SET_DB_PRIORITY,
380                                         nodes, 0,
381                                         CONTROL_TIMEOUT(), false, data,
382                                         NULL, NULL,
383                                         NULL) != 0) {
384                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
385                 }
386         }
387
388         return 0;
389 }                       
390
391 /*
392   ensure all other nodes have attached to any databases that we have
393  */
394 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
395                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
396 {
397         int i, j, db, ret;
398         struct ctdb_dbid_map *remote_dbmap;
399
400         /* verify that all other nodes have all our databases */
401         for (j=0; j<nodemap->num; j++) {
402                 /* we dont need to ourself ourselves */
403                 if (nodemap->nodes[j].pnn == pnn) {
404                         continue;
405                 }
406                 /* dont check nodes that are unavailable */
407                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
408                         continue;
409                 }
410
411                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
412                                          mem_ctx, &remote_dbmap);
413                 if (ret != 0) {
414                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
415                         return -1;
416                 }
417
418                 /* step through all local databases */
419                 for (db=0; db<dbmap->num;db++) {
420                         const char *name;
421
422
423                         for (i=0;i<remote_dbmap->num;i++) {
424                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
425                                         break;
426                                 }
427                         }
428                         /* the remote node already have this database */
429                         if (i!=remote_dbmap->num) {
430                                 continue;
431                         }
432                         /* ok so we need to create this database */
433                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
434                                             mem_ctx, &name);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
437                                 return -1;
438                         }
439                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                            mem_ctx, name, dbmap->dbs[db].persistent);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   ensure we are attached to any databases that anyone else is attached to
454  */
455 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
456                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
457 {
458         int i, j, db, ret;
459         struct ctdb_dbid_map *remote_dbmap;
460
461         /* verify that we have all database any other node has */
462         for (j=0; j<nodemap->num; j++) {
463                 /* we dont need to ourself ourselves */
464                 if (nodemap->nodes[j].pnn == pnn) {
465                         continue;
466                 }
467                 /* dont check nodes that are unavailable */
468                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
469                         continue;
470                 }
471
472                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
473                                          mem_ctx, &remote_dbmap);
474                 if (ret != 0) {
475                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
476                         return -1;
477                 }
478
479                 /* step through all databases on the remote node */
480                 for (db=0; db<remote_dbmap->num;db++) {
481                         const char *name;
482
483                         for (i=0;i<(*dbmap)->num;i++) {
484                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
485                                         break;
486                                 }
487                         }
488                         /* we already have this db locally */
489                         if (i!=(*dbmap)->num) {
490                                 continue;
491                         }
492                         /* ok so we need to create this database and
493                            rebuild dbmap
494                          */
495                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
496                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
497                         if (ret != 0) {
498                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
499                                           nodemap->nodes[j].pnn));
500                                 return -1;
501                         }
502                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
503                                            remote_dbmap->dbs[db].persistent);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
506                                 return -1;
507                         }
508                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
511                                 return -1;
512                         }
513                 }
514         }
515
516         return 0;
517 }
518
519
520 /*
521   pull the remote database contents from one node into the recdb
522  */
523 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
524                                     struct tdb_wrap *recdb, uint32_t dbid,
525                                     bool persistent)
526 {
527         int ret;
528         TDB_DATA outdata;
529         struct ctdb_marshall_buffer *reply;
530         struct ctdb_rec_data *rec;
531         int i;
532         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
533
534         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
535                                CONTROL_TIMEOUT(), &outdata);
536         if (ret != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
538                 talloc_free(tmp_ctx);
539                 return -1;
540         }
541
542         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
543
544         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
545                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
546                 talloc_free(tmp_ctx);
547                 return -1;
548         }
549         
550         rec = (struct ctdb_rec_data *)&reply->data[0];
551         
552         for (i=0;
553              i<reply->count;
554              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
555                 TDB_DATA key, data;
556                 struct ctdb_ltdb_header *hdr;
557                 TDB_DATA existing;
558                 
559                 key.dptr = &rec->data[0];
560                 key.dsize = rec->keylen;
561                 data.dptr = &rec->data[key.dsize];
562                 data.dsize = rec->datalen;
563                 
564                 hdr = (struct ctdb_ltdb_header *)data.dptr;
565
566                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
567                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
568                         talloc_free(tmp_ctx);
569                         return -1;
570                 }
571
572                 /* fetch the existing record, if any */
573                 existing = tdb_fetch(recdb->tdb, key);
574                 
575                 if (existing.dptr != NULL) {
576                         struct ctdb_ltdb_header header;
577                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
578                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
579                                          (unsigned)existing.dsize, srcnode));
580                                 free(existing.dptr);
581                                 talloc_free(tmp_ctx);
582                                 return -1;
583                         }
584                         header = *(struct ctdb_ltdb_header *)existing.dptr;
585                         free(existing.dptr);
586                         if (!(header.rsn < hdr->rsn ||
587                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
588                                 continue;
589                         }
590                 }
591                 
592                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
593                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
594                         talloc_free(tmp_ctx);
595                         return -1;                              
596                 }
597         }
598
599         talloc_free(tmp_ctx);
600
601         return 0;
602 }
603
604 /*
605   pull all the remote database contents into the recdb
606  */
607 static int pull_remote_database(struct ctdb_context *ctdb,
608                                 struct ctdb_recoverd *rec, 
609                                 struct ctdb_node_map *nodemap, 
610                                 struct tdb_wrap *recdb, uint32_t dbid,
611                                 bool persistent)
612 {
613         int j;
614
615         /* pull all records from all other nodes across onto this node
616            (this merges based on rsn)
617         */
618         for (j=0; j<nodemap->num; j++) {
619                 /* dont merge from nodes that are unavailable */
620                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
621                         continue;
622                 }
623                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
624                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
625                                  nodemap->nodes[j].pnn));
626                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
627                         return -1;
628                 }
629         }
630         
631         return 0;
632 }
633
634
635 /*
636   update flags on all active nodes
637  */
638 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
639 {
640         int ret;
641
642         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
643                 if (ret != 0) {
644                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
645                 return -1;
646         }
647
648         return 0;
649 }
650
651 /*
652   ensure all nodes have the same vnnmap we do
653  */
654 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
655                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
656 {
657         int j, ret;
658
659         /* push the new vnn map out to all the nodes */
660         for (j=0; j<nodemap->num; j++) {
661                 /* dont push to nodes that are unavailable */
662                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
663                         continue;
664                 }
665
666                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
667                 if (ret != 0) {
668                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
669                         return -1;
670                 }
671         }
672
673         return 0;
674 }
675
676
677 struct vacuum_info {
678         struct vacuum_info *next, *prev;
679         struct ctdb_recoverd *rec;
680         uint32_t srcnode;
681         struct ctdb_db_context *ctdb_db;
682         struct ctdb_marshall_buffer *recs;
683         struct ctdb_rec_data *r;
684 };
685
686 static void vacuum_fetch_next(struct vacuum_info *v);
687
688 /*
689   called when a vacuum fetch has completed - just free it and do the next one
690  */
691 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
692 {
693         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
694         talloc_free(state);
695         vacuum_fetch_next(v);
696 }
697
698
699 /*
700   process the next element from the vacuum list
701 */
702 static void vacuum_fetch_next(struct vacuum_info *v)
703 {
704         struct ctdb_call call;
705         struct ctdb_rec_data *r;
706
707         while (v->recs->count) {
708                 struct ctdb_client_call_state *state;
709                 TDB_DATA data;
710                 struct ctdb_ltdb_header *hdr;
711
712                 ZERO_STRUCT(call);
713                 call.call_id = CTDB_NULL_FUNC;
714                 call.flags = CTDB_IMMEDIATE_MIGRATION;
715
716                 r = v->r;
717                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
718                 v->recs->count--;
719
720                 call.key.dptr = &r->data[0];
721                 call.key.dsize = r->keylen;
722
723                 /* ensure we don't block this daemon - just skip a record if we can't get
724                    the chainlock */
725                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
726                         continue;
727                 }
728
729                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
730                 if (data.dptr == NULL) {
731                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
732                         continue;
733                 }
734
735                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
736                         free(data.dptr);
737                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
738                         continue;
739                 }
740                 
741                 hdr = (struct ctdb_ltdb_header *)data.dptr;
742                 if (hdr->dmaster == v->rec->ctdb->pnn) {
743                         /* its already local */
744                         free(data.dptr);
745                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
746                         continue;
747                 }
748
749                 free(data.dptr);
750
751                 state = ctdb_call_send(v->ctdb_db, &call);
752                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
753                 if (state == NULL) {
754                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
755                         talloc_free(v);
756                         return;
757                 }
758                 state->async.fn = vacuum_fetch_callback;
759                 state->async.private_data = v;
760                 return;
761         }
762
763         talloc_free(v);
764 }
765
766
767 /*
768   destroy a vacuum info structure
769  */
770 static int vacuum_info_destructor(struct vacuum_info *v)
771 {
772         DLIST_REMOVE(v->rec->vacuum_info, v);
773         return 0;
774 }
775
776
777 /*
778   handler for vacuum fetch
779 */
780 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
781                                  TDB_DATA data, void *private_data)
782 {
783         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
784         struct ctdb_marshall_buffer *recs;
785         int ret, i;
786         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
787         const char *name;
788         struct ctdb_dbid_map *dbmap=NULL;
789         bool persistent = false;
790         struct ctdb_db_context *ctdb_db;
791         struct ctdb_rec_data *r;
792         uint32_t srcnode;
793         struct vacuum_info *v;
794
795         recs = (struct ctdb_marshall_buffer *)data.dptr;
796         r = (struct ctdb_rec_data *)&recs->data[0];
797
798         if (recs->count == 0) {
799                 talloc_free(tmp_ctx);
800                 return;
801         }
802
803         srcnode = r->reqid;
804
805         for (v=rec->vacuum_info;v;v=v->next) {
806                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
807                         /* we're already working on records from this node */
808                         talloc_free(tmp_ctx);
809                         return;
810                 }
811         }
812
813         /* work out if the database is persistent */
814         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
815         if (ret != 0) {
816                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
817                 talloc_free(tmp_ctx);
818                 return;
819         }
820
821         for (i=0;i<dbmap->num;i++) {
822                 if (dbmap->dbs[i].dbid == recs->db_id) {
823                         persistent = dbmap->dbs[i].persistent;
824                         break;
825                 }
826         }
827         if (i == dbmap->num) {
828                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
829                 talloc_free(tmp_ctx);
830                 return;         
831         }
832
833         /* find the name of this database */
834         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
835                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
836                 talloc_free(tmp_ctx);
837                 return;
838         }
839
840         /* attach to it */
841         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
842         if (ctdb_db == NULL) {
843                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
844                 talloc_free(tmp_ctx);
845                 return;
846         }
847
848         v = talloc_zero(rec, struct vacuum_info);
849         if (v == NULL) {
850                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
851                 talloc_free(tmp_ctx);
852                 return;
853         }
854
855         v->rec = rec;
856         v->srcnode = srcnode;
857         v->ctdb_db = ctdb_db;
858         v->recs = talloc_memdup(v, recs, data.dsize);
859         if (v->recs == NULL) {
860                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
861                 talloc_free(v);
862                 talloc_free(tmp_ctx);
863                 return;         
864         }
865         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
866
867         DLIST_ADD(rec->vacuum_info, v);
868
869         talloc_set_destructor(v, vacuum_info_destructor);
870
871         vacuum_fetch_next(v);
872         talloc_free(tmp_ctx);
873 }
874
875
876 /*
877   called when ctdb_wait_timeout should finish
878  */
879 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
880                               struct timeval yt, void *p)
881 {
882         uint32_t *timed_out = (uint32_t *)p;
883         (*timed_out) = 1;
884 }
885
886 /*
887   wait for a given number of seconds
888  */
889 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
890 {
891         uint32_t timed_out = 0;
892         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
893         while (!timed_out) {
894                 event_loop_once(ctdb->ev);
895         }
896 }
897
898 /*
899   called when an election times out (ends)
900  */
901 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
902                                   struct timeval t, void *p)
903 {
904         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
905         rec->election_timeout = NULL;
906
907         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
908 }
909
910
911 /*
912   wait for an election to finish. It finished election_timeout seconds after
913   the last election packet is received
914  */
915 static void ctdb_wait_election(struct ctdb_recoverd *rec)
916 {
917         struct ctdb_context *ctdb = rec->ctdb;
918         while (rec->election_timeout) {
919                 event_loop_once(ctdb->ev);
920         }
921 }
922
923 /*
924   Update our local flags from all remote connected nodes. 
925   This is only run when we are or we belive we are the recovery master
926  */
927 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
928 {
929         int j;
930         struct ctdb_context *ctdb = rec->ctdb;
931         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
932
933         /* get the nodemap for all active remote nodes and verify
934            they are the same as for this node
935          */
936         for (j=0; j<nodemap->num; j++) {
937                 struct ctdb_node_map *remote_nodemap=NULL;
938                 int ret;
939
940                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
941                         continue;
942                 }
943                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
944                         continue;
945                 }
946
947                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
948                                            mem_ctx, &remote_nodemap);
949                 if (ret != 0) {
950                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
951                                   nodemap->nodes[j].pnn));
952                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
953                         talloc_free(mem_ctx);
954                         return MONITOR_FAILED;
955                 }
956                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
957                         /* We should tell our daemon about this so it
958                            updates its flags or else we will log the same 
959                            message again in the next iteration of recovery.
960                            Since we are the recovery master we can just as
961                            well update the flags on all nodes.
962                         */
963                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
964                         if (ret != 0) {
965                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
966                                 return -1;
967                         }
968
969                         /* Update our local copy of the flags in the recovery
970                            daemon.
971                         */
972                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
973                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
974                                  nodemap->nodes[j].flags));
975                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
976                 }
977                 talloc_free(remote_nodemap);
978         }
979         talloc_free(mem_ctx);
980         return MONITOR_OK;
981 }
982
983
984 /* Create a new random generation ip. 
985    The generation id can not be the INVALID_GENERATION id
986 */
987 static uint32_t new_generation(void)
988 {
989         uint32_t generation;
990
991         while (1) {
992                 generation = random();
993
994                 if (generation != INVALID_GENERATION) {
995                         break;
996                 }
997         }
998
999         return generation;
1000 }
1001
1002
1003 /*
1004   create a temporary working database
1005  */
1006 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1007 {
1008         char *name;
1009         struct tdb_wrap *recdb;
1010         unsigned tdb_flags;
1011
1012         /* open up the temporary recovery database */
1013         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1014         if (name == NULL) {
1015                 return NULL;
1016         }
1017         unlink(name);
1018
1019         tdb_flags = TDB_NOLOCK;
1020         if (!ctdb->do_setsched) {
1021                 tdb_flags |= TDB_NOMMAP;
1022         }
1023
1024         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1025                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1026         if (recdb == NULL) {
1027                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1028         }
1029
1030         talloc_free(name);
1031
1032         return recdb;
1033 }
1034
1035
1036 /* 
1037    a traverse function for pulling all relevent records from recdb
1038  */
1039 struct recdb_data {
1040         struct ctdb_context *ctdb;
1041         struct ctdb_marshall_buffer *recdata;
1042         uint32_t len;
1043         bool failed;
1044         bool persistent;
1045 };
1046
1047 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1048 {
1049         struct recdb_data *params = (struct recdb_data *)p;
1050         struct ctdb_rec_data *rec;
1051         struct ctdb_ltdb_header *hdr;
1052
1053         /* skip empty records */
1054         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1055                 return 0;
1056         }
1057
1058         /* update the dmaster field to point to us */
1059         hdr = (struct ctdb_ltdb_header *)data.dptr;
1060         if (!params->persistent) {
1061                 hdr->dmaster = params->ctdb->pnn;
1062         }
1063
1064         /* add the record to the blob ready to send to the nodes */
1065         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1066         if (rec == NULL) {
1067                 params->failed = true;
1068                 return -1;
1069         }
1070         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1071         if (params->recdata == NULL) {
1072                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1073                          rec->length + params->len, params->recdata->count));
1074                 params->failed = true;
1075                 return -1;
1076         }
1077         params->recdata->count++;
1078         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1079         params->len += rec->length;
1080         talloc_free(rec);
1081
1082         return 0;
1083 }
1084
1085 /*
1086   push the recdb database out to all nodes
1087  */
1088 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1089                                bool persistent,
1090                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1091 {
1092         struct recdb_data params;
1093         struct ctdb_marshall_buffer *recdata;
1094         TDB_DATA outdata;
1095         TALLOC_CTX *tmp_ctx;
1096         uint32_t *nodes;
1097
1098         tmp_ctx = talloc_new(ctdb);
1099         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1100
1101         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1102         CTDB_NO_MEMORY(ctdb, recdata);
1103
1104         recdata->db_id = dbid;
1105
1106         params.ctdb = ctdb;
1107         params.recdata = recdata;
1108         params.len = offsetof(struct ctdb_marshall_buffer, data);
1109         params.failed = false;
1110         params.persistent = persistent;
1111
1112         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1113                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1114                 talloc_free(params.recdata);
1115                 talloc_free(tmp_ctx);
1116                 return -1;
1117         }
1118
1119         if (params.failed) {
1120                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1121                 talloc_free(params.recdata);
1122                 talloc_free(tmp_ctx);
1123                 return -1;              
1124         }
1125
1126         recdata = params.recdata;
1127
1128         outdata.dptr = (void *)recdata;
1129         outdata.dsize = params.len;
1130
1131         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1132         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1133                                         nodes, 0,
1134                                         CONTROL_TIMEOUT(), false, outdata,
1135                                         NULL, NULL,
1136                                         NULL) != 0) {
1137                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1138                 talloc_free(recdata);
1139                 talloc_free(tmp_ctx);
1140                 return -1;
1141         }
1142
1143         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1144                   dbid, recdata->count));
1145
1146         talloc_free(recdata);
1147         talloc_free(tmp_ctx);
1148
1149         return 0;
1150 }
1151
1152
1153 /*
1154   go through a full recovery on one database 
1155  */
1156 static int recover_database(struct ctdb_recoverd *rec, 
1157                             TALLOC_CTX *mem_ctx,
1158                             uint32_t dbid,
1159                             bool persistent,
1160                             uint32_t pnn, 
1161                             struct ctdb_node_map *nodemap,
1162                             uint32_t transaction_id)
1163 {
1164         struct tdb_wrap *recdb;
1165         int ret;
1166         struct ctdb_context *ctdb = rec->ctdb;
1167         TDB_DATA data;
1168         struct ctdb_control_wipe_database w;
1169         uint32_t *nodes;
1170
1171         recdb = create_recdb(ctdb, mem_ctx);
1172         if (recdb == NULL) {
1173                 return -1;
1174         }
1175
1176         /* pull all remote databases onto the recdb */
1177         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1178         if (ret != 0) {
1179                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1180                 return -1;
1181         }
1182
1183         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1184
1185         /* wipe all the remote databases. This is safe as we are in a transaction */
1186         w.db_id = dbid;
1187         w.transaction_id = transaction_id;
1188
1189         data.dptr = (void *)&w;
1190         data.dsize = sizeof(w);
1191
1192         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1193         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1194                                         nodes, 0,
1195                                         CONTROL_TIMEOUT(), false, data,
1196                                         NULL, NULL,
1197                                         NULL) != 0) {
1198                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1199                 talloc_free(recdb);
1200                 return -1;
1201         }
1202         
1203         /* push out the correct database. This sets the dmaster and skips 
1204            the empty records */
1205         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1206         if (ret != 0) {
1207                 talloc_free(recdb);
1208                 return -1;
1209         }
1210
1211         /* all done with this database */
1212         talloc_free(recdb);
1213
1214         return 0;
1215 }
1216
1217 /*
1218   reload the nodes file 
1219 */
1220 static void reload_nodes_file(struct ctdb_context *ctdb)
1221 {
1222         ctdb->nodes = NULL;
1223         ctdb_load_nodes_file(ctdb);
1224 }
1225
1226         
1227 /*
1228   we are the recmaster, and recovery is needed - start a recovery run
1229  */
1230 static int do_recovery(struct ctdb_recoverd *rec, 
1231                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1232                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1233 {
1234         struct ctdb_context *ctdb = rec->ctdb;
1235         int i, j, ret;
1236         uint32_t generation;
1237         struct ctdb_dbid_map *dbmap;
1238         TDB_DATA data;
1239         uint32_t *nodes;
1240         struct timeval start_time;
1241
1242         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1243
1244         /* if recovery fails, force it again */
1245         rec->need_recovery = true;
1246
1247         for (i=0; i<ctdb->num_nodes; i++) {
1248                 struct ctdb_banning_state *ban_state;
1249
1250                 if (ctdb->nodes[i]->ban_state == NULL) {
1251                         continue;
1252                 }
1253                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1254                 if (ban_state->count < 2*ctdb->num_nodes) {
1255                         continue;
1256                 }
1257                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1258                         ctdb->nodes[i]->pnn, ban_state->count,
1259                         ctdb->tunable.recovery_ban_period));
1260                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1261                 ban_state->count = 0;
1262         }
1263
1264
1265         if (ctdb->tunable.verify_recovery_lock != 0) {
1266                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1267                 start_time = timeval_current();
1268                 if (!ctdb_recovery_lock(ctdb, true)) {
1269                         ctdb_set_culprit(rec, pnn);
1270                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1271                         return -1;
1272                 }
1273                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1274                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1275         }
1276
1277         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1278
1279         /* get a list of all databases */
1280         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1281         if (ret != 0) {
1282                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1283                 return -1;
1284         }
1285
1286         /* we do the db creation before we set the recovery mode, so the freeze happens
1287            on all databases we will be dealing with. */
1288
1289         /* verify that we have all the databases any other node has */
1290         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1291         if (ret != 0) {
1292                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1293                 return -1;
1294         }
1295
1296         /* verify that all other nodes have all our databases */
1297         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1298         if (ret != 0) {
1299                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1300                 return -1;
1301         }
1302         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1303
1304         /* update the database priority for all remote databases */
1305         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1306         if (ret != 0) {
1307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1308         }
1309         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1310
1311
1312         /* set recovery mode to active on all nodes */
1313         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1314         if (ret != 0) {
1315                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1316                 return -1;
1317         }
1318
1319         /* execute the "startrecovery" event script on all nodes */
1320         ret = run_startrecovery_eventscript(rec, nodemap);
1321         if (ret!=0) {
1322                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1323                 return -1;
1324         }
1325
1326         /* pick a new generation number */
1327         generation = new_generation();
1328
1329         /* change the vnnmap on this node to use the new generation 
1330            number but not on any other nodes.
1331            this guarantees that if we abort the recovery prematurely
1332            for some reason (a node stops responding?)
1333            that we can just return immediately and we will reenter
1334            recovery shortly again.
1335            I.e. we deliberately leave the cluster with an inconsistent
1336            generation id to allow us to abort recovery at any stage and
1337            just restart it from scratch.
1338          */
1339         vnnmap->generation = generation;
1340         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1341         if (ret != 0) {
1342                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1343                 return -1;
1344         }
1345
1346         data.dptr = (void *)&generation;
1347         data.dsize = sizeof(uint32_t);
1348
1349         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1350         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1351                                         nodes, 0,
1352                                         CONTROL_TIMEOUT(), false, data,
1353                                         NULL,
1354                                         transaction_start_fail_callback,
1355                                         rec) != 0) {
1356                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1357                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1358                                         nodes, 0,
1359                                         CONTROL_TIMEOUT(), false, tdb_null,
1360                                         NULL,
1361                                         NULL,
1362                                         NULL) != 0) {
1363                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1364                 }
1365                 return -1;
1366         }
1367
1368         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1369
1370         for (i=0;i<dbmap->num;i++) {
1371                 ret = recover_database(rec, mem_ctx,
1372                                        dbmap->dbs[i].dbid,
1373                                        dbmap->dbs[i].persistent,
1374                                        pnn, nodemap, generation);
1375                 if (ret != 0) {
1376                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1377                         return -1;
1378                 }
1379         }
1380
1381         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1382
1383         /* commit all the changes */
1384         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1385                                         nodes, 0,
1386                                         CONTROL_TIMEOUT(), false, data,
1387                                         NULL, NULL,
1388                                         NULL) != 0) {
1389                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1390                 return -1;
1391         }
1392
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1394         
1395
1396         /* update the capabilities for all nodes */
1397         ret = update_capabilities(ctdb, nodemap);
1398         if (ret!=0) {
1399                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1400                 return -1;
1401         }
1402
1403         /* build a new vnn map with all the currently active and
1404            unbanned nodes */
1405         generation = new_generation();
1406         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1407         CTDB_NO_MEMORY(ctdb, vnnmap);
1408         vnnmap->generation = generation;
1409         vnnmap->size = 0;
1410         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1411         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1412         for (i=j=0;i<nodemap->num;i++) {
1413                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1414                         continue;
1415                 }
1416                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1417                         /* this node can not be an lmaster */
1418                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1419                         continue;
1420                 }
1421
1422                 vnnmap->size++;
1423                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1424                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1425                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1426
1427         }
1428         if (vnnmap->size == 0) {
1429                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1430                 vnnmap->size++;
1431                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1432                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1433                 vnnmap->map[0] = pnn;
1434         }       
1435
1436         /* update to the new vnnmap on all nodes */
1437         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1438         if (ret != 0) {
1439                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1440                 return -1;
1441         }
1442
1443         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1444
1445         /* update recmaster to point to us for all nodes */
1446         ret = set_recovery_master(ctdb, nodemap, pnn);
1447         if (ret!=0) {
1448                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1449                 return -1;
1450         }
1451
1452         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1453
1454         /*
1455           update all nodes to have the same flags that we have
1456          */
1457         for (i=0;i<nodemap->num;i++) {
1458                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1459                         continue;
1460                 }
1461
1462                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1463                 if (ret != 0) {
1464                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1465                         return -1;
1466                 }
1467         }
1468
1469         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1470
1471         /* disable recovery mode */
1472         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1473         if (ret != 0) {
1474                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1475                 return -1;
1476         }
1477
1478         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1479
1480         /*
1481           tell nodes to takeover their public IPs
1482          */
1483         rec->need_takeover_run = false;
1484         ret = ctdb_takeover_run(ctdb, nodemap);
1485         if (ret != 0) {
1486                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1487                 return -1;
1488         }
1489         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1490
1491         /* execute the "recovered" event script on all nodes */
1492         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1493         if (ret!=0) {
1494                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1495                 return -1;
1496         }
1497
1498         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1499
1500         /* send a message to all clients telling them that the cluster 
1501            has been reconfigured */
1502         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1503
1504         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1505
1506         rec->need_recovery = false;
1507
1508         /* we managed to complete a full recovery, make sure to forgive
1509            any past sins by the nodes that could now participate in the
1510            recovery.
1511         */
1512         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1513         for (i=0;i<nodemap->num;i++) {
1514                 struct ctdb_banning_state *ban_state;
1515
1516                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1517                         continue;
1518                 }
1519
1520                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1521                 if (ban_state == NULL) {
1522                         continue;
1523                 }
1524
1525                 ban_state->count = 0;
1526         }
1527
1528
1529         /* We just finished a recovery successfully. 
1530            We now wait for rerecovery_timeout before we allow 
1531            another recovery to take place.
1532         */
1533         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1534         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1535         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1536
1537         return 0;
1538 }
1539
1540
1541 /*
1542   elections are won by first checking the number of connected nodes, then
1543   the priority time, then the pnn
1544  */
1545 struct election_message {
1546         uint32_t num_connected;
1547         struct timeval priority_time;
1548         uint32_t pnn;
1549         uint32_t node_flags;
1550 };
1551
1552 /*
1553   form this nodes election data
1554  */
1555 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1556 {
1557         int ret, i;
1558         struct ctdb_node_map *nodemap;
1559         struct ctdb_context *ctdb = rec->ctdb;
1560
1561         ZERO_STRUCTP(em);
1562
1563         em->pnn = rec->ctdb->pnn;
1564         em->priority_time = rec->priority_time;
1565
1566         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1569                 return;
1570         }
1571
1572         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1573         em->node_flags = rec->node_flags;
1574
1575         for (i=0;i<nodemap->num;i++) {
1576                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1577                         em->num_connected++;
1578                 }
1579         }
1580
1581         /* we shouldnt try to win this election if we cant be a recmaster */
1582         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1583                 em->num_connected = 0;
1584                 em->priority_time = timeval_current();
1585         }
1586
1587         talloc_free(nodemap);
1588 }
1589
1590 /*
1591   see if the given election data wins
1592  */
1593 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1594 {
1595         struct election_message myem;
1596         int cmp = 0;
1597
1598         ctdb_election_data(rec, &myem);
1599
1600         /* we cant win if we dont have the recmaster capability */
1601         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1602                 return false;
1603         }
1604
1605         /* we cant win if we are banned */
1606         if (rec->node_flags & NODE_FLAGS_BANNED) {
1607                 return false;
1608         }       
1609
1610         /* we cant win if we are stopped */
1611         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1612                 return false;
1613         }       
1614
1615         /* we will automatically win if the other node is banned */
1616         if (em->node_flags & NODE_FLAGS_BANNED) {
1617                 return true;
1618         }
1619
1620         /* we will automatically win if the other node is banned */
1621         if (em->node_flags & NODE_FLAGS_STOPPED) {
1622                 return true;
1623         }
1624
1625         /* try to use the most connected node */
1626         if (cmp == 0) {
1627                 cmp = (int)myem.num_connected - (int)em->num_connected;
1628         }
1629
1630         /* then the longest running node */
1631         if (cmp == 0) {
1632                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1633         }
1634
1635         if (cmp == 0) {
1636                 cmp = (int)myem.pnn - (int)em->pnn;
1637         }
1638
1639         return cmp > 0;
1640 }
1641
1642 /*
1643   send out an election request
1644  */
1645 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1646 {
1647         int ret;
1648         TDB_DATA election_data;
1649         struct election_message emsg;
1650         uint64_t srvid;
1651         struct ctdb_context *ctdb = rec->ctdb;
1652
1653         srvid = CTDB_SRVID_RECOVERY;
1654
1655         ctdb_election_data(rec, &emsg);
1656
1657         election_data.dsize = sizeof(struct election_message);
1658         election_data.dptr  = (unsigned char *)&emsg;
1659
1660
1661         /* send an election message to all active nodes */
1662         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1663         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1664
1665
1666         /* A new node that is already frozen has entered the cluster.
1667            The existing nodes are not frozen and dont need to be frozen
1668            until the election has ended and we start the actual recovery
1669         */
1670         if (update_recmaster == true) {
1671                 /* first we assume we will win the election and set 
1672                    recoverymaster to be ourself on the current node
1673                  */
1674                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1675                 if (ret != 0) {
1676                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1677                         return -1;
1678                 }
1679         }
1680
1681
1682         return 0;
1683 }
1684
1685 /*
1686   this function will unban all nodes in the cluster
1687 */
1688 static void unban_all_nodes(struct ctdb_context *ctdb)
1689 {
1690         int ret, i;
1691         struct ctdb_node_map *nodemap;
1692         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1693         
1694         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1695         if (ret != 0) {
1696                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1697                 return;
1698         }
1699
1700         for (i=0;i<nodemap->num;i++) {
1701                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1702                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1703                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1704                 }
1705         }
1706
1707         talloc_free(tmp_ctx);
1708 }
1709
1710
1711 /*
1712   we think we are winning the election - send a broadcast election request
1713  */
1714 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1715 {
1716         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1717         int ret;
1718
1719         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1720         if (ret != 0) {
1721                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1722         }
1723
1724         talloc_free(rec->send_election_te);
1725         rec->send_election_te = NULL;
1726 }
1727
1728 /*
1729   handler for memory dumps
1730 */
1731 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1732                              TDB_DATA data, void *private_data)
1733 {
1734         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1735         TDB_DATA *dump;
1736         int ret;
1737         struct rd_memdump_reply *rd;
1738
1739         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1740                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1741                 talloc_free(tmp_ctx);
1742                 return;
1743         }
1744         rd = (struct rd_memdump_reply *)data.dptr;
1745
1746         dump = talloc_zero(tmp_ctx, TDB_DATA);
1747         if (dump == NULL) {
1748                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1749                 talloc_free(tmp_ctx);
1750                 return;
1751         }
1752         ret = ctdb_dump_memory(ctdb, dump);
1753         if (ret != 0) {
1754                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1755                 talloc_free(tmp_ctx);
1756                 return;
1757         }
1758
1759 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1760
1761         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1762         if (ret != 0) {
1763                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1764                 talloc_free(tmp_ctx);
1765                 return;
1766         }
1767
1768         talloc_free(tmp_ctx);
1769 }
1770
1771 /*
1772   handler for reload_nodes
1773 */
1774 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1775                              TDB_DATA data, void *private_data)
1776 {
1777         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1778
1779         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1780
1781         reload_nodes_file(rec->ctdb);
1782 }
1783
1784
1785 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1786                               struct timeval yt, void *p)
1787 {
1788         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1789
1790         talloc_free(rec->ip_check_disable_ctx);
1791         rec->ip_check_disable_ctx = NULL;
1792 }
1793
1794 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1795                              TDB_DATA data, void *private_data)
1796 {
1797         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1798         uint32_t timeout;
1799
1800         if (rec->ip_check_disable_ctx != NULL) {
1801                 talloc_free(rec->ip_check_disable_ctx);
1802                 rec->ip_check_disable_ctx = NULL;
1803         }
1804
1805         if (data.dsize != sizeof(uint32_t)) {
1806                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1807                                  "expexting %lu\n", (long unsigned)data.dsize,
1808                                  (long unsigned)sizeof(uint32_t)));
1809                 return;
1810         }
1811         if (data.dptr == NULL) {
1812                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1813                 return;
1814         }
1815
1816         timeout = *((uint32_t *)data.dptr);
1817         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1818
1819         rec->ip_check_disable_ctx = talloc_new(rec);
1820         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1821
1822         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1823 }
1824
1825
1826 /*
1827   handler for ip reallocate, just add it to the list of callers and 
1828   handle this later in the monitor_cluster loop so we do not recurse
1829   with other callers to takeover_run()
1830 */
1831 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1832                              TDB_DATA data, void *private_data)
1833 {
1834         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1835         struct ip_reallocate_list *caller;
1836
1837         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1838                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1839                 return;
1840         }
1841
1842         if (rec->ip_reallocate_ctx == NULL) {
1843                 rec->ip_reallocate_ctx = talloc_new(rec);
1844                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1845         }
1846
1847         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1848         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1849
1850         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1851         caller->next = rec->reallocate_callers;
1852         rec->reallocate_callers = caller;
1853
1854         return;
1855 }
1856
1857 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1858 {
1859         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1860         TDB_DATA result;
1861         int32_t ret;
1862         struct ip_reallocate_list *callers;
1863
1864         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1865         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1866         result.dsize = sizeof(int32_t);
1867         result.dptr  = (uint8_t *)&ret;
1868
1869         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1870
1871                 /* Someone that sent srvid==0 does not want a reply */
1872                 if (callers->rd->srvid == 0) {
1873                         continue;
1874                 }
1875                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1876                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1877                                   (unsigned long long)callers->rd->srvid));
1878                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1879                 if (ret != 0) {
1880                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1881                                          "message to %u:%llu\n",
1882                                          (unsigned)callers->rd->pnn,
1883                                          (unsigned long long)callers->rd->srvid));
1884                 }
1885         }
1886
1887         talloc_free(tmp_ctx);
1888         talloc_free(rec->ip_reallocate_ctx);
1889         rec->ip_reallocate_ctx = NULL;
1890         rec->reallocate_callers = NULL;
1891         
1892 }
1893
1894
1895 /*
1896   handler for recovery master elections
1897 */
1898 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1899                              TDB_DATA data, void *private_data)
1900 {
1901         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1902         int ret;
1903         struct election_message *em = (struct election_message *)data.dptr;
1904         TALLOC_CTX *mem_ctx;
1905
1906         /* we got an election packet - update the timeout for the election */
1907         talloc_free(rec->election_timeout);
1908         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1909                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1910                                                 ctdb_election_timeout, rec);
1911
1912         mem_ctx = talloc_new(ctdb);
1913
1914         /* someone called an election. check their election data
1915            and if we disagree and we would rather be the elected node, 
1916            send a new election message to all other nodes
1917          */
1918         if (ctdb_election_win(rec, em)) {
1919                 if (!rec->send_election_te) {
1920                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1921                                                                 timeval_current_ofs(0, 500000),
1922                                                                 election_send_request, rec);
1923                 }
1924                 talloc_free(mem_ctx);
1925                 /*unban_all_nodes(ctdb);*/
1926                 return;
1927         }
1928         
1929         /* we didn't win */
1930         talloc_free(rec->send_election_te);
1931         rec->send_election_te = NULL;
1932
1933         if (ctdb->tunable.verify_recovery_lock != 0) {
1934                 /* release the recmaster lock */
1935                 if (em->pnn != ctdb->pnn &&
1936                     ctdb->recovery_lock_fd != -1) {
1937                         close(ctdb->recovery_lock_fd);
1938                         ctdb->recovery_lock_fd = -1;
1939                         unban_all_nodes(ctdb);
1940                 }
1941         }
1942
1943         /* ok, let that guy become recmaster then */
1944         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1945         if (ret != 0) {
1946                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1947                 talloc_free(mem_ctx);
1948                 return;
1949         }
1950
1951         talloc_free(mem_ctx);
1952         return;
1953 }
1954
1955
1956 /*
1957   force the start of the election process
1958  */
1959 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1960                            struct ctdb_node_map *nodemap)
1961 {
1962         int ret;
1963         struct ctdb_context *ctdb = rec->ctdb;
1964
1965         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1966
1967         /* set all nodes to recovery mode to stop all internode traffic */
1968         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1969         if (ret != 0) {
1970                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1971                 return;
1972         }
1973
1974         talloc_free(rec->election_timeout);
1975         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1976                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1977                                                 ctdb_election_timeout, rec);
1978
1979         ret = send_election_request(rec, pnn, true);
1980         if (ret!=0) {
1981                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1982                 return;
1983         }
1984
1985         /* wait for a few seconds to collect all responses */
1986         ctdb_wait_election(rec);
1987 }
1988
1989
1990
1991 /*
1992   handler for when a node changes its flags
1993 */
1994 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1995                             TDB_DATA data, void *private_data)
1996 {
1997         int ret;
1998         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1999         struct ctdb_node_map *nodemap=NULL;
2000         TALLOC_CTX *tmp_ctx;
2001         uint32_t changed_flags;
2002         int i;
2003         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2004         int disabled_flag_changed;
2005
2006         if (data.dsize != sizeof(*c)) {
2007                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2008                 return;
2009         }
2010
2011         tmp_ctx = talloc_new(ctdb);
2012         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2013
2014         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2015         if (ret != 0) {
2016                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2017                 talloc_free(tmp_ctx);
2018                 return;         
2019         }
2020
2021
2022         for (i=0;i<nodemap->num;i++) {
2023                 if (nodemap->nodes[i].pnn == c->pnn) break;
2024         }
2025
2026         if (i == nodemap->num) {
2027                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2028                 talloc_free(tmp_ctx);
2029                 return;
2030         }
2031
2032         changed_flags = c->old_flags ^ c->new_flags;
2033
2034         if (nodemap->nodes[i].flags != c->new_flags) {
2035                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2036         }
2037
2038         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2039
2040         nodemap->nodes[i].flags = c->new_flags;
2041
2042         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2043                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2044
2045         if (ret == 0) {
2046                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2047                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2048         }
2049         
2050         if (ret == 0 &&
2051             ctdb->recovery_master == ctdb->pnn &&
2052             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2053                 /* Only do the takeover run if the perm disabled or unhealthy
2054                    flags changed since these will cause an ip failover but not
2055                    a recovery.
2056                    If the node became disconnected or banned this will also
2057                    lead to an ip address failover but that is handled 
2058                    during recovery
2059                 */
2060                 if (disabled_flag_changed) {
2061                         rec->need_takeover_run = true;
2062                 }
2063         }
2064
2065         talloc_free(tmp_ctx);
2066 }
2067
2068 /*
2069   handler for when we need to push out flag changes ot all other nodes
2070 */
2071 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2072                             TDB_DATA data, void *private_data)
2073 {
2074         int ret;
2075         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2076         struct ctdb_node_map *nodemap=NULL;
2077         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2078         uint32_t recmaster;
2079         uint32_t *nodes;
2080
2081         /* find the recovery master */
2082         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2083         if (ret != 0) {
2084                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2085                 talloc_free(tmp_ctx);
2086                 return;
2087         }
2088
2089         /* read the node flags from the recmaster */
2090         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2091         if (ret != 0) {
2092                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2093                 talloc_free(tmp_ctx);
2094                 return;
2095         }
2096         if (c->pnn >= nodemap->num) {
2097                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2098                 talloc_free(tmp_ctx);
2099                 return;
2100         }
2101
2102         /* send the flags update to all connected nodes */
2103         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2104
2105         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2106                                       nodes, 0, CONTROL_TIMEOUT(),
2107                                       false, data,
2108                                       NULL, NULL,
2109                                       NULL) != 0) {
2110                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2111
2112                 talloc_free(tmp_ctx);
2113                 return;
2114         }
2115
2116         talloc_free(tmp_ctx);
2117 }
2118
2119
2120 struct verify_recmode_normal_data {
2121         uint32_t count;
2122         enum monitor_result status;
2123 };
2124
2125 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2126 {
2127         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2128
2129
2130         /* one more node has responded with recmode data*/
2131         rmdata->count--;
2132
2133         /* if we failed to get the recmode, then return an error and let
2134            the main loop try again.
2135         */
2136         if (state->state != CTDB_CONTROL_DONE) {
2137                 if (rmdata->status == MONITOR_OK) {
2138                         rmdata->status = MONITOR_FAILED;
2139                 }
2140                 return;
2141         }
2142
2143         /* if we got a response, then the recmode will be stored in the
2144            status field
2145         */
2146         if (state->status != CTDB_RECOVERY_NORMAL) {
2147                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2148                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2149         }
2150
2151         return;
2152 }
2153
2154
2155 /* verify that all nodes are in normal recovery mode */
2156 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2157 {
2158         struct verify_recmode_normal_data *rmdata;
2159         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2160         struct ctdb_client_control_state *state;
2161         enum monitor_result status;
2162         int j;
2163         
2164         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2165         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2166         rmdata->count  = 0;
2167         rmdata->status = MONITOR_OK;
2168
2169         /* loop over all active nodes and send an async getrecmode call to 
2170            them*/
2171         for (j=0; j<nodemap->num; j++) {
2172                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2173                         continue;
2174                 }
2175                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2176                                         CONTROL_TIMEOUT(), 
2177                                         nodemap->nodes[j].pnn);
2178                 if (state == NULL) {
2179                         /* we failed to send the control, treat this as 
2180                            an error and try again next iteration
2181                         */                      
2182                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2183                         talloc_free(mem_ctx);
2184                         return MONITOR_FAILED;
2185                 }
2186
2187                 /* set up the callback functions */
2188                 state->async.fn = verify_recmode_normal_callback;
2189                 state->async.private_data = rmdata;
2190
2191                 /* one more control to wait for to complete */
2192                 rmdata->count++;
2193         }
2194
2195
2196         /* now wait for up to the maximum number of seconds allowed
2197            or until all nodes we expect a response from has replied
2198         */
2199         while (rmdata->count > 0) {
2200                 event_loop_once(ctdb->ev);
2201         }
2202
2203         status = rmdata->status;
2204         talloc_free(mem_ctx);
2205         return status;
2206 }
2207
2208
2209 struct verify_recmaster_data {
2210         struct ctdb_recoverd *rec;
2211         uint32_t count;
2212         uint32_t pnn;
2213         enum monitor_result status;
2214 };
2215
2216 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2217 {
2218         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2219
2220
2221         /* one more node has responded with recmaster data*/
2222         rmdata->count--;
2223
2224         /* if we failed to get the recmaster, then return an error and let
2225            the main loop try again.
2226         */
2227         if (state->state != CTDB_CONTROL_DONE) {
2228                 if (rmdata->status == MONITOR_OK) {
2229                         rmdata->status = MONITOR_FAILED;
2230                 }
2231                 return;
2232         }
2233
2234         /* if we got a response, then the recmaster will be stored in the
2235            status field
2236         */
2237         if (state->status != rmdata->pnn) {
2238                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2239                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2240                 rmdata->status = MONITOR_ELECTION_NEEDED;
2241         }
2242
2243         return;
2244 }
2245
2246
2247 /* verify that all nodes agree that we are the recmaster */
2248 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2249 {
2250         struct ctdb_context *ctdb = rec->ctdb;
2251         struct verify_recmaster_data *rmdata;
2252         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2253         struct ctdb_client_control_state *state;
2254         enum monitor_result status;
2255         int j;
2256         
2257         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2258         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2259         rmdata->rec    = rec;
2260         rmdata->count  = 0;
2261         rmdata->pnn    = pnn;
2262         rmdata->status = MONITOR_OK;
2263
2264         /* loop over all active nodes and send an async getrecmaster call to 
2265            them*/
2266         for (j=0; j<nodemap->num; j++) {
2267                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2268                         continue;
2269                 }
2270                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2271                                         CONTROL_TIMEOUT(),
2272                                         nodemap->nodes[j].pnn);
2273                 if (state == NULL) {
2274                         /* we failed to send the control, treat this as 
2275                            an error and try again next iteration
2276                         */                      
2277                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2278                         talloc_free(mem_ctx);
2279                         return MONITOR_FAILED;
2280                 }
2281
2282                 /* set up the callback functions */
2283                 state->async.fn = verify_recmaster_callback;
2284                 state->async.private_data = rmdata;
2285
2286                 /* one more control to wait for to complete */
2287                 rmdata->count++;
2288         }
2289
2290
2291         /* now wait for up to the maximum number of seconds allowed
2292            or until all nodes we expect a response from has replied
2293         */
2294         while (rmdata->count > 0) {
2295                 event_loop_once(ctdb->ev);
2296         }
2297
2298         status = rmdata->status;
2299         talloc_free(mem_ctx);
2300         return status;
2301 }
2302
2303
2304 /* called to check that the allocation of public ip addresses is ok.
2305 */
2306 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2307 {
2308         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2309         struct ctdb_all_public_ips *ips = NULL;
2310         struct ctdb_uptime *uptime1 = NULL;
2311         struct ctdb_uptime *uptime2 = NULL;
2312         int ret, j;
2313
2314         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2315                                 CTDB_CURRENT_NODE, &uptime1);
2316         if (ret != 0) {
2317                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2318                 talloc_free(mem_ctx);
2319                 return -1;
2320         }
2321
2322         /* read the ip allocation from the local node */
2323         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2324         if (ret != 0) {
2325                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2326                 talloc_free(mem_ctx);
2327                 return -1;
2328         }
2329
2330         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2331                                 CTDB_CURRENT_NODE, &uptime2);
2332         if (ret != 0) {
2333                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2334                 talloc_free(mem_ctx);
2335                 return -1;
2336         }
2337
2338         /* skip the check if the startrecovery time has changed */
2339         if (timeval_compare(&uptime1->last_recovery_started,
2340                             &uptime2->last_recovery_started) != 0) {
2341                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2342                 talloc_free(mem_ctx);
2343                 return 0;
2344         }
2345
2346         /* skip the check if the endrecovery time has changed */
2347         if (timeval_compare(&uptime1->last_recovery_finished,
2348                             &uptime2->last_recovery_finished) != 0) {
2349                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2350                 talloc_free(mem_ctx);
2351                 return 0;
2352         }
2353
2354         /* skip the check if we have started but not finished recovery */
2355         if (timeval_compare(&uptime1->last_recovery_finished,
2356                             &uptime1->last_recovery_started) != 1) {
2357                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2358                 talloc_free(mem_ctx);
2359
2360                 return 0;
2361         }
2362
2363         /* verify that we have the ip addresses we should have
2364            and we dont have ones we shouldnt have.
2365            if we find an inconsistency we set recmode to
2366            active on the local node and wait for the recmaster
2367            to do a full blown recovery
2368         */
2369         for (j=0; j<ips->num; j++) {
2370                 if (ips->ips[j].pnn == pnn) {
2371                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2372                                 struct takeover_run_reply rd;
2373                                 TDB_DATA data;
2374
2375                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2376                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2377
2378                                 rd.pnn   = ctdb->pnn;
2379                                 rd.srvid = 0;
2380                                 data.dptr = (uint8_t *)&rd;
2381                                 data.dsize = sizeof(rd);
2382
2383                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2384                                 if (ret != 0) {
2385                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2386                                 }
2387                         }
2388                 } else {
2389                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2390                                 struct takeover_run_reply rd;
2391                                 TDB_DATA data;
2392
2393                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2394                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2395
2396                                 rd.pnn   = ctdb->pnn;
2397                                 rd.srvid = 0;
2398                                 data.dptr = (uint8_t *)&rd;
2399                                 data.dsize = sizeof(rd);
2400
2401                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2402                                 if (ret != 0) {
2403                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2404                                 }
2405                         }
2406                 }
2407         }
2408
2409         talloc_free(mem_ctx);
2410         return 0;
2411 }
2412
2413
2414 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2415 {
2416         struct ctdb_node_map **remote_nodemaps = callback_data;
2417
2418         if (node_pnn >= ctdb->num_nodes) {
2419                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2420                 return;
2421         }
2422
2423         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2424
2425 }
2426
2427 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2428         struct ctdb_node_map *nodemap,
2429         struct ctdb_node_map **remote_nodemaps)
2430 {
2431         uint32_t *nodes;
2432
2433         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2434         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2435                                         nodes, 0,
2436                                         CONTROL_TIMEOUT(), false, tdb_null,
2437                                         async_getnodemap_callback,
2438                                         NULL,
2439                                         remote_nodemaps) != 0) {
2440                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2441
2442                 return -1;
2443         }
2444
2445         return 0;
2446 }
2447
2448 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2449 struct ctdb_check_reclock_state {
2450         struct ctdb_context *ctdb;
2451         struct timeval start_time;
2452         int fd[2];
2453         pid_t child;
2454         struct timed_event *te;
2455         struct fd_event *fde;
2456         enum reclock_child_status status;
2457 };
2458
2459 /* when we free the reclock state we must kill any child process.
2460 */
2461 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2462 {
2463         struct ctdb_context *ctdb = state->ctdb;
2464
2465         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2466
2467         if (state->fd[0] != -1) {
2468                 close(state->fd[0]);
2469                 state->fd[0] = -1;
2470         }
2471         if (state->fd[1] != -1) {
2472                 close(state->fd[1]);
2473                 state->fd[1] = -1;
2474         }
2475         kill(state->child, SIGKILL);
2476         return 0;
2477 }
2478
2479 /*
2480   called if our check_reclock child times out. this would happen if
2481   i/o to the reclock file blocks.
2482  */
2483 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2484                                          struct timeval t, void *private_data)
2485 {
2486         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2487                                            struct ctdb_check_reclock_state);
2488
2489         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2490         state->status = RECLOCK_TIMEOUT;
2491 }
2492
2493 /* this is called when the child process has completed checking the reclock
2494    file and has written data back to us through the pipe.
2495 */
2496 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2497                              uint16_t flags, void *private_data)
2498 {
2499         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2500                                              struct ctdb_check_reclock_state);
2501         char c = 0;
2502         int ret;
2503
2504         /* we got a response from our child process so we can abort the
2505            timeout.
2506         */
2507         talloc_free(state->te);
2508         state->te = NULL;
2509
2510         ret = read(state->fd[0], &c, 1);
2511         if (ret != 1 || c != RECLOCK_OK) {
2512                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2513                 state->status = RECLOCK_FAILED;
2514
2515                 return;
2516         }
2517
2518         state->status = RECLOCK_OK;
2519         return;
2520 }
2521
2522 static int check_recovery_lock(struct ctdb_context *ctdb)
2523 {
2524         int ret;
2525         struct ctdb_check_reclock_state *state;
2526         pid_t parent = getpid();
2527
2528         if (ctdb->recovery_lock_fd == -1) {
2529                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2530                 return -1;
2531         }
2532
2533         state = talloc(ctdb, struct ctdb_check_reclock_state);
2534         CTDB_NO_MEMORY(ctdb, state);
2535
2536         state->ctdb = ctdb;
2537         state->start_time = timeval_current();
2538         state->status = RECLOCK_CHECKING;
2539         state->fd[0] = -1;
2540         state->fd[1] = -1;
2541
2542         ret = pipe(state->fd);
2543         if (ret != 0) {
2544                 talloc_free(state);
2545                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2546                 return -1;
2547         }
2548
2549         state->child = fork();
2550         if (state->child == (pid_t)-1) {
2551                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2552                 close(state->fd[0]);
2553                 state->fd[0] = -1;
2554                 close(state->fd[1]);
2555                 state->fd[1] = -1;
2556                 talloc_free(state);
2557                 return -1;
2558         }
2559
2560         if (state->child == 0) {
2561                 char cc = RECLOCK_OK;
2562                 close(state->fd[0]);
2563                 state->fd[0] = -1;
2564
2565                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2566                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2567                         cc = RECLOCK_FAILED;
2568                 }
2569
2570                 write(state->fd[1], &cc, 1);
2571                 /* make sure we die when our parent dies */
2572                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2573                         sleep(5);
2574                         write(state->fd[1], &cc, 1);
2575                 }
2576                 _exit(0);
2577         }
2578         close(state->fd[1]);
2579         state->fd[1] = -1;
2580         set_close_on_exec(state->fd[0]);
2581
2582         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2583
2584         talloc_set_destructor(state, check_reclock_destructor);
2585
2586         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2587                                     ctdb_check_reclock_timeout, state);
2588         if (state->te == NULL) {
2589                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2590                 talloc_free(state);
2591                 return -1;
2592         }
2593
2594         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2595                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2596                                 reclock_child_handler,
2597                                 (void *)state);
2598
2599         if (state->fde == NULL) {
2600                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2601                 talloc_free(state);
2602                 return -1;
2603         }
2604
2605         while (state->status == RECLOCK_CHECKING) {
2606                 event_loop_once(ctdb->ev);
2607         }
2608
2609         if (state->status == RECLOCK_FAILED) {
2610                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2611                 close(ctdb->recovery_lock_fd);
2612                 ctdb->recovery_lock_fd = -1;
2613                 talloc_free(state);
2614                 return -1;
2615         }
2616
2617         talloc_free(state);
2618         return 0;
2619 }
2620
2621 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2622 {
2623         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2624         const char *reclockfile;
2625
2626         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2627                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2628                 talloc_free(tmp_ctx);
2629                 return -1;      
2630         }
2631
2632         if (reclockfile == NULL) {
2633                 if (ctdb->recovery_lock_file != NULL) {
2634                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2635                         talloc_free(ctdb->recovery_lock_file);
2636                         ctdb->recovery_lock_file = NULL;
2637                         if (ctdb->recovery_lock_fd != -1) {
2638                                 close(ctdb->recovery_lock_fd);
2639                                 ctdb->recovery_lock_fd = -1;
2640                         }
2641                 }
2642                 ctdb->tunable.verify_recovery_lock = 0;
2643                 talloc_free(tmp_ctx);
2644                 return 0;
2645         }
2646
2647         if (ctdb->recovery_lock_file == NULL) {
2648                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2649                 if (ctdb->recovery_lock_fd != -1) {
2650                         close(ctdb->recovery_lock_fd);
2651                         ctdb->recovery_lock_fd = -1;
2652                 }
2653                 talloc_free(tmp_ctx);
2654                 return 0;
2655         }
2656
2657
2658         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2659                 talloc_free(tmp_ctx);
2660                 return 0;
2661         }
2662
2663         talloc_free(ctdb->recovery_lock_file);
2664         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2665         ctdb->tunable.verify_recovery_lock = 0;
2666         if (ctdb->recovery_lock_fd != -1) {
2667                 close(ctdb->recovery_lock_fd);
2668                 ctdb->recovery_lock_fd = -1;
2669         }
2670
2671         talloc_free(tmp_ctx);
2672         return 0;
2673 }
2674                 
2675 /*
2676   the main monitoring loop
2677  */
2678 static void monitor_cluster(struct ctdb_context *ctdb)
2679 {
2680         uint32_t pnn;
2681         TALLOC_CTX *mem_ctx=NULL;
2682         struct ctdb_node_map *nodemap=NULL;
2683         struct ctdb_node_map *recmaster_nodemap=NULL;
2684         struct ctdb_node_map **remote_nodemaps=NULL;
2685         struct ctdb_vnn_map *vnnmap=NULL;
2686         struct ctdb_vnn_map *remote_vnnmap=NULL;
2687         int32_t debug_level;
2688         int i, j, ret;
2689         struct ctdb_recoverd *rec;
2690
2691         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2692
2693         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2694         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2695
2696         rec->ctdb = ctdb;
2697
2698         rec->priority_time = timeval_current();
2699
2700         /* register a message port for sending memory dumps */
2701         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2702
2703         /* register a message port for recovery elections */
2704         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2705
2706         /* when nodes are disabled/enabled */
2707         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2708
2709         /* when we are asked to puch out a flag change */
2710         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2711
2712         /* register a message port for vacuum fetch */
2713         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2714
2715         /* register a message port for reloadnodes  */
2716         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2717
2718         /* register a message port for performing a takeover run */
2719         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2720
2721         /* register a message port for disabling the ip check for a short while */
2722         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2723
2724 again:
2725         if (mem_ctx) {
2726                 talloc_free(mem_ctx);
2727                 mem_ctx = NULL;
2728         }
2729         mem_ctx = talloc_new(ctdb);
2730         if (!mem_ctx) {
2731                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2732                 exit(-1);
2733         }
2734
2735         /* we only check for recovery once every second */
2736         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2737
2738         /* verify that the main daemon is still running */
2739         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2740                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2741                 exit(-1);
2742         }
2743
2744         /* ping the local daemon to tell it we are alive */
2745         ctdb_ctrl_recd_ping(ctdb);
2746
2747         if (rec->election_timeout) {
2748                 /* an election is in progress */
2749                 goto again;
2750         }
2751
2752         /* read the debug level from the parent and update locally */
2753         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2754         if (ret !=0) {
2755                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2756                 goto again;
2757         }
2758         LogLevel = debug_level;
2759
2760
2761         /* We must check if we need to ban a node here but we want to do this
2762            as early as possible so we dont wait until we have pulled the node
2763            map from the local node. thats why we have the hardcoded value 20
2764         */
2765         for (i=0; i<ctdb->num_nodes; i++) {
2766                 struct ctdb_banning_state *ban_state;
2767
2768                 if (ctdb->nodes[i]->ban_state == NULL) {
2769                         continue;
2770                 }
2771                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2772                 if (ban_state->count < 20) {
2773                         continue;
2774                 }
2775                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2776                         ctdb->nodes[i]->pnn, ban_state->count,
2777                         ctdb->tunable.recovery_ban_period));
2778                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2779                 ban_state->count = 0;
2780         }
2781
2782         /* get relevant tunables */
2783         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2784         if (ret != 0) {
2785                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2786                 goto again;
2787         }
2788
2789         /* get the current recovery lock file from the server */
2790         if (update_recovery_lock_file(ctdb) != 0) {
2791                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2792                 goto again;
2793         }
2794
2795         /* Make sure that if recovery lock verification becomes disabled when
2796            we close the file
2797         */
2798         if (ctdb->tunable.verify_recovery_lock == 0) {
2799                 if (ctdb->recovery_lock_fd != -1) {
2800                         close(ctdb->recovery_lock_fd);
2801                         ctdb->recovery_lock_fd = -1;
2802                 }
2803         }
2804
2805         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2806         if (pnn == (uint32_t)-1) {
2807                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2808                 goto again;
2809         }
2810
2811         /* get the vnnmap */
2812         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2813         if (ret != 0) {
2814                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2815                 goto again;
2816         }
2817
2818
2819         /* get number of nodes */
2820         if (rec->nodemap) {
2821                 talloc_free(rec->nodemap);
2822                 rec->nodemap = NULL;
2823                 nodemap=NULL;
2824         }
2825         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2826         if (ret != 0) {
2827                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2828                 goto again;
2829         }
2830         nodemap = rec->nodemap;
2831
2832         /* check which node is the recovery master */
2833         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2834         if (ret != 0) {
2835                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2836                 goto again;
2837         }
2838
2839         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2840         if (rec->recmaster != pnn) {
2841                 if (rec->ip_reallocate_ctx != NULL) {
2842                         talloc_free(rec->ip_reallocate_ctx);
2843                         rec->ip_reallocate_ctx = NULL;
2844                         rec->reallocate_callers = NULL;
2845                 }
2846         }
2847         /* if there are takeovers requested, perform it and notify the waiters */
2848         if (rec->reallocate_callers) {
2849                 process_ipreallocate_requests(ctdb, rec);
2850         }
2851
2852         if (rec->recmaster == (uint32_t)-1) {
2853                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2854                 force_election(rec, pnn, nodemap);
2855                 goto again;
2856         }
2857
2858
2859         /* if the local daemon is STOPPED, we verify that the databases are
2860            also frozen and thet the recmode is set to active 
2861         */
2862         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2863                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2864                 if (ret != 0) {
2865                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2866                 }
2867                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2868                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2869
2870                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2871                         if (ret != 0) {
2872                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2873                                 goto again;
2874                         }
2875                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2876                         if (ret != 0) {
2877                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2878
2879                                 goto again;
2880                         }
2881                         goto again;
2882                 }
2883         }
2884         /* If the local node is stopped, verify we are not the recmaster 
2885            and yield this role if so
2886         */
2887         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2888                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2889                 force_election(rec, pnn, nodemap);
2890                 goto again;
2891         }
2892         
2893         /* check that we (recovery daemon) and the local ctdb daemon
2894            agrees on whether we are banned or not
2895         */
2896 //qqq
2897
2898         /* remember our own node flags */
2899         rec->node_flags = nodemap->nodes[pnn].flags;
2900
2901         /* count how many active nodes there are */
2902         rec->num_active    = 0;
2903         rec->num_connected = 0;
2904         for (i=0; i<nodemap->num; i++) {
2905                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2906                         rec->num_active++;
2907                 }
2908                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2909                         rec->num_connected++;
2910                 }
2911         }
2912
2913
2914         /* verify that the recmaster node is still active */
2915         for (j=0; j<nodemap->num; j++) {
2916                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2917                         break;
2918                 }
2919         }
2920
2921         if (j == nodemap->num) {
2922                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2923                 force_election(rec, pnn, nodemap);
2924                 goto again;
2925         }
2926
2927         /* if recovery master is disconnected we must elect a new recmaster */
2928         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2929                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2930                 force_election(rec, pnn, nodemap);
2931                 goto again;
2932         }
2933
2934         /* grap the nodemap from the recovery master to check if it is banned */
2935         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2936                                    mem_ctx, &recmaster_nodemap);
2937         if (ret != 0) {
2938                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2939                           nodemap->nodes[j].pnn));
2940                 goto again;
2941         }
2942
2943
2944         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2945                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2946                 force_election(rec, pnn, nodemap);
2947                 goto again;
2948         }
2949
2950
2951         /* verify that we have all ip addresses we should have and we dont
2952          * have addresses we shouldnt have.
2953          */ 
2954         if (ctdb->do_checkpublicip) {
2955                 if (rec->ip_check_disable_ctx == NULL) {
2956                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
2957                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2958                         }
2959                 }
2960         }
2961
2962
2963         /* if we are not the recmaster then we do not need to check
2964            if recovery is needed
2965          */
2966         if (pnn != rec->recmaster) {
2967                 goto again;
2968         }
2969
2970
2971         /* ensure our local copies of flags are right */
2972         ret = update_local_flags(rec, nodemap);
2973         if (ret == MONITOR_ELECTION_NEEDED) {
2974                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2975                 force_election(rec, pnn, nodemap);
2976                 goto again;
2977         }
2978         if (ret != MONITOR_OK) {
2979                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2980                 goto again;
2981         }
2982
2983         /* update the list of public ips that a node can handle for
2984            all connected nodes
2985         */
2986         if (ctdb->num_nodes != nodemap->num) {
2987                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2988                 reload_nodes_file(ctdb);
2989                 goto again;
2990         }
2991         for (j=0; j<nodemap->num; j++) {
2992                 /* release any existing data */
2993                 if (ctdb->nodes[j]->public_ips) {
2994                         talloc_free(ctdb->nodes[j]->public_ips);
2995                         ctdb->nodes[j]->public_ips = NULL;
2996                 }
2997
2998                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2999                         continue;
3000                 }
3001
3002                 /* grab a new shiny list of public ips from the node */
3003                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
3004                         ctdb->nodes[j]->pnn, 
3005                         ctdb->nodes,
3006                         &ctdb->nodes[j]->public_ips)) {
3007                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
3008                                 ctdb->nodes[j]->pnn));
3009                         goto again;
3010                 }
3011         }
3012
3013
3014         /* verify that all active nodes agree that we are the recmaster */
3015         switch (verify_recmaster(rec, nodemap, pnn)) {
3016         case MONITOR_RECOVERY_NEEDED:
3017                 /* can not happen */
3018                 goto again;
3019         case MONITOR_ELECTION_NEEDED:
3020                 force_election(rec, pnn, nodemap);
3021                 goto again;
3022         case MONITOR_OK:
3023                 break;
3024         case MONITOR_FAILED:
3025                 goto again;
3026         }
3027
3028
3029         if (rec->need_recovery) {
3030                 /* a previous recovery didn't finish */
3031                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3032                 goto again;             
3033         }
3034
3035         /* verify that all active nodes are in normal mode 
3036            and not in recovery mode 
3037         */
3038         switch (verify_recmode(ctdb, nodemap)) {
3039         case MONITOR_RECOVERY_NEEDED:
3040                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3041                 goto again;
3042         case MONITOR_FAILED:
3043                 goto again;
3044         case MONITOR_ELECTION_NEEDED:
3045                 /* can not happen */
3046         case MONITOR_OK:
3047                 break;
3048         }
3049
3050
3051         if (ctdb->tunable.verify_recovery_lock != 0) {
3052                 /* we should have the reclock - check its not stale */
3053                 ret = check_recovery_lock(ctdb);
3054                 if (ret != 0) {
3055                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3056                         ctdb_set_culprit(rec, ctdb->pnn);
3057                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3058                         goto again;
3059                 }
3060         }
3061
3062         /* get the nodemap for all active remote nodes
3063          */
3064         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3065         if (remote_nodemaps == NULL) {
3066                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3067                 goto again;
3068         }
3069         for(i=0; i<nodemap->num; i++) {
3070                 remote_nodemaps[i] = NULL;
3071         }
3072         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3073                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3074                 goto again;
3075         } 
3076
3077         /* verify that all other nodes have the same nodemap as we have
3078         */
3079         for (j=0; j<nodemap->num; j++) {
3080                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3081                         continue;
3082                 }
3083
3084                 if (remote_nodemaps[j] == NULL) {
3085                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3086                         ctdb_set_culprit(rec, j);
3087
3088                         goto again;
3089                 }
3090
3091                 /* if the nodes disagree on how many nodes there are
3092                    then this is a good reason to try recovery
3093                  */
3094                 if (remote_nodemaps[j]->num != nodemap->num) {
3095                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3096                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3097                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3098                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3099                         goto again;
3100                 }
3101
3102                 /* if the nodes disagree on which nodes exist and are
3103                    active, then that is also a good reason to do recovery
3104                  */
3105                 for (i=0;i<nodemap->num;i++) {
3106                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3107                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3108                                           nodemap->nodes[j].pnn, i, 
3109                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3110                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3111                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3112                                             vnnmap);
3113                                 goto again;
3114                         }
3115                 }
3116
3117                 /* verify the flags are consistent
3118                 */
3119                 for (i=0; i<nodemap->num; i++) {
3120                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3121                                 continue;
3122                         }
3123                         
3124                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3125                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3126                                   nodemap->nodes[j].pnn, 
3127                                   nodemap->nodes[i].pnn, 
3128                                   remote_nodemaps[j]->nodes[i].flags,
3129                                   nodemap->nodes[j].flags));
3130                                 if (i == j) {
3131                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3132                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3133                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3134                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3135                                                     vnnmap);
3136                                         goto again;
3137                                 } else {
3138                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3139                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3140                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3141                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3142                                                     vnnmap);
3143                                         goto again;
3144                                 }
3145                         }
3146                 }
3147         }
3148
3149
3150         /* there better be the same number of lmasters in the vnn map
3151            as there are active nodes or we will have to do a recovery
3152          */
3153         if (vnnmap->size != rec->num_active) {
3154                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3155                           vnnmap->size, rec->num_active));
3156                 ctdb_set_culprit(rec, ctdb->pnn);
3157                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3158                 goto again;
3159         }
3160
3161         /* verify that all active nodes in the nodemap also exist in 
3162            the vnnmap.
3163          */
3164         for (j=0; j<nodemap->num; j++) {
3165                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3166                         continue;
3167                 }
3168                 if (nodemap->nodes[j].pnn == pnn) {
3169                         continue;
3170                 }
3171
3172                 for (i=0; i<vnnmap->size; i++) {
3173                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3174                                 break;
3175                         }
3176                 }
3177                 if (i == vnnmap->size) {
3178                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3179                                   nodemap->nodes[j].pnn));
3180                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3181                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3182                         goto again;
3183                 }
3184         }
3185
3186         
3187         /* verify that all other nodes have the same vnnmap
3188            and are from the same generation
3189          */
3190         for (j=0; j<nodemap->num; j++) {
3191                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3192                         continue;
3193                 }
3194                 if (nodemap->nodes[j].pnn == pnn) {
3195                         continue;
3196                 }
3197
3198                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3199                                           mem_ctx, &remote_vnnmap);
3200                 if (ret != 0) {
3201                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3202                                   nodemap->nodes[j].pnn));
3203                         goto again;
3204                 }
3205
3206                 /* verify the vnnmap generation is the same */
3207                 if (vnnmap->generation != remote_vnnmap->generation) {
3208                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3209                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3210                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3211                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3212                         goto again;
3213                 }
3214
3215                 /* verify the vnnmap size is the same */
3216                 if (vnnmap->size != remote_vnnmap->size) {
3217                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3218                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3219                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3220                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3221                         goto again;
3222                 }
3223
3224                 /* verify the vnnmap is the same */
3225                 for (i=0;i<vnnmap->size;i++) {
3226                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3227                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3228                                           nodemap->nodes[j].pnn));
3229                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3230                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3231                                             vnnmap);
3232                                 goto again;
3233                         }
3234                 }
3235         }
3236
3237         /* we might need to change who has what IP assigned */
3238         if (rec->need_takeover_run) {
3239                 rec->need_takeover_run = false;
3240
3241                 /* execute the "startrecovery" event script on all nodes */
3242                 ret = run_startrecovery_eventscript(rec, nodemap);
3243                 if (ret!=0) {
3244                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3245                         ctdb_set_culprit(rec, ctdb->pnn);
3246                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3247                 }
3248
3249                 ret = ctdb_takeover_run(ctdb, nodemap);
3250                 if (ret != 0) {
3251                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3252                         ctdb_set_culprit(rec, ctdb->pnn);
3253                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3254                 }
3255
3256                 /* execute the "recovered" event script on all nodes */
3257                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3258 #if 0
3259 // we cant check whether the event completed successfully
3260 // since this script WILL fail if the node is in recovery mode
3261 // and if that race happens, the code here would just cause a second
3262 // cascading recovery.
3263                 if (ret!=0) {
3264                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3265                         ctdb_set_culprit(rec, ctdb->pnn);
3266                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3267                 }
3268 #endif
3269         }
3270
3271
3272         goto again;
3273
3274 }
3275
3276 /*
3277   event handler for when the main ctdbd dies
3278  */
3279 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3280                                  uint16_t flags, void *private_data)
3281 {
3282         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3283         _exit(1);
3284 }
3285
3286 /*
3287   called regularly to verify that the recovery daemon is still running
3288  */
3289 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3290                               struct timeval yt, void *p)
3291 {
3292         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3293
3294         if (kill(ctdb->recoverd_pid, 0) != 0) {
3295                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3296
3297                 ctdb_stop_recoverd(ctdb);
3298                 ctdb_stop_keepalive(ctdb);
3299                 ctdb_stop_monitoring(ctdb);
3300                 ctdb_release_all_ips(ctdb);
3301                 if (ctdb->methods != NULL) {
3302                         ctdb->methods->shutdown(ctdb);
3303                 }
3304                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3305
3306                 exit(10);       
3307         }
3308
3309         event_add_timed(ctdb->ev, ctdb, 
3310                         timeval_current_ofs(30, 0),
3311                         ctdb_check_recd, ctdb);
3312 }
3313
3314 static void recd_sig_child_handler(struct event_context *ev,
3315         struct signal_event *se, int signum, int count,
3316         void *dont_care, 
3317         void *private_data)
3318 {
3319 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3320         int status;
3321         pid_t pid = -1;
3322
3323         while (pid != 0) {
3324                 pid = waitpid(-1, &status, WNOHANG);
3325                 if (pid == -1) {
3326                         if (errno != ECHILD) {
3327                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3328                         }
3329                         return;
3330                 }
3331                 if (pid > 0) {
3332                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3333                 }
3334         }
3335 }
3336
3337 /*
3338   startup the recovery daemon as a child of the main ctdb daemon
3339  */
3340 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3341 {
3342         int fd[2];
3343         struct signal_event *se;
3344
3345         if (pipe(fd) != 0) {
3346                 return -1;
3347         }
3348
3349         ctdb->ctdbd_pid = getpid();
3350
3351         ctdb->recoverd_pid = fork();
3352         if (ctdb->recoverd_pid == -1) {
3353                 return -1;
3354         }
3355         
3356         if (ctdb->recoverd_pid != 0) {
3357                 close(fd[0]);
3358                 event_add_timed(ctdb->ev, ctdb, 
3359                                 timeval_current_ofs(30, 0),
3360                                 ctdb_check_recd, ctdb);
3361                 return 0;
3362         }
3363
3364         close(fd[1]);
3365
3366         srandom(getpid() ^ time(NULL));
3367
3368         if (switch_from_server_to_client(ctdb) != 0) {
3369                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3370                 exit(1);
3371         }
3372
3373         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3374
3375         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3376                      ctdb_recoverd_parent, &fd[0]);     
3377
3378         /* set up a handler to pick up sigchld */
3379         se = event_add_signal(ctdb->ev, ctdb,
3380                                      SIGCHLD, 0,
3381                                      recd_sig_child_handler,
3382                                      ctdb);
3383         if (se == NULL) {
3384                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3385                 exit(1);
3386         }
3387
3388         monitor_cluster(ctdb);
3389
3390         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3391         return -1;
3392 }
3393
3394 /*
3395   shutdown the recovery daemon
3396  */
3397 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3398 {
3399         if (ctdb->recoverd_pid == 0) {
3400                 return;
3401         }
3402
3403         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3404         kill(ctdb->recoverd_pid, SIGTERM);
3405 }