recoverd: Also check if current node is in recovery when it is banned
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         if (!ctdb_validate_pnn(ctdb, pnn)) {
84                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
85                 return;
86         }
87
88         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221
222         if (node_pnn == ctdb->pnn) {
223                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
224         }
225 }
226
227 /*
228   update the node capabilities for all connected nodes
229  */
230 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
231 {
232         uint32_t *nodes;
233         TALLOC_CTX *tmp_ctx;
234
235         tmp_ctx = talloc_new(ctdb);
236         CTDB_NO_MEMORY(ctdb, tmp_ctx);
237
238         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
239         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
240                                         nodes, 0,
241                                         CONTROL_TIMEOUT(),
242                                         false, tdb_null,
243                                         async_getcap_callback, NULL,
244                                         NULL) != 0) {
245                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
246                 talloc_free(tmp_ctx);
247                 return -1;
248         }
249
250         talloc_free(tmp_ctx);
251         return 0;
252 }
253
254 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
255 {
256         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
257
258         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
259         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
260 }
261
262 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
263 {
264         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
265
266         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
267         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
268 }
269
270 /*
271   change recovery mode on all nodes
272  */
273 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
274 {
275         TDB_DATA data;
276         uint32_t *nodes;
277         TALLOC_CTX *tmp_ctx;
278
279         tmp_ctx = talloc_new(ctdb);
280         CTDB_NO_MEMORY(ctdb, tmp_ctx);
281
282         /* freeze all nodes */
283         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
284         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
285                 int i;
286
287                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
288                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
289                                                 nodes, i,
290                                                 CONTROL_TIMEOUT(),
291                                                 false, tdb_null,
292                                                 NULL,
293                                                 set_recmode_fail_callback,
294                                                 rec) != 0) {
295                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
296                                 talloc_free(tmp_ctx);
297                                 return -1;
298                         }
299                 }
300         }
301
302
303         data.dsize = sizeof(uint32_t);
304         data.dptr = (unsigned char *)&rec_mode;
305
306         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
307                                         nodes, 0,
308                                         CONTROL_TIMEOUT(),
309                                         false, data,
310                                         NULL, NULL,
311                                         NULL) != 0) {
312                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
313                 talloc_free(tmp_ctx);
314                 return -1;
315         }
316
317         talloc_free(tmp_ctx);
318         return 0;
319 }
320
321 /*
322   change recovery master on all node
323  */
324 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
325 {
326         TDB_DATA data;
327         TALLOC_CTX *tmp_ctx;
328         uint32_t *nodes;
329
330         tmp_ctx = talloc_new(ctdb);
331         CTDB_NO_MEMORY(ctdb, tmp_ctx);
332
333         data.dsize = sizeof(uint32_t);
334         data.dptr = (unsigned char *)&pnn;
335
336         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
337         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
338                                         nodes, 0,
339                                         CONTROL_TIMEOUT(), false, data,
340                                         NULL, NULL,
341                                         NULL) != 0) {
342                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
343                 talloc_free(tmp_ctx);
344                 return -1;
345         }
346
347         talloc_free(tmp_ctx);
348         return 0;
349 }
350
351 /* update all remote nodes to use the same db priority that we have
352    this can fail if the remove node has not yet been upgraded to 
353    support this function, so we always return success and never fail
354    a recovery if this call fails.
355 */
356 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
357         struct ctdb_node_map *nodemap, 
358         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
359 {
360         int db;
361         uint32_t *nodes;
362
363         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
364
365         /* step through all local databases */
366         for (db=0; db<dbmap->num;db++) {
367                 TDB_DATA data;
368                 struct ctdb_db_priority db_prio;
369                 int ret;
370
371                 db_prio.db_id     = dbmap->dbs[db].dbid;
372                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
373                 if (ret != 0) {
374                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
375                         continue;
376                 }
377
378                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
379
380                 data.dptr  = (uint8_t *)&db_prio;
381                 data.dsize = sizeof(db_prio);
382
383                 if (ctdb_client_async_control(ctdb,
384                                         CTDB_CONTROL_SET_DB_PRIORITY,
385                                         nodes, 0,
386                                         CONTROL_TIMEOUT(), false, data,
387                                         NULL, NULL,
388                                         NULL) != 0) {
389                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
390                 }
391         }
392
393         return 0;
394 }                       
395
396 /*
397   ensure all other nodes have attached to any databases that we have
398  */
399 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
400                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
401 {
402         int i, j, db, ret;
403         struct ctdb_dbid_map *remote_dbmap;
404
405         /* verify that all other nodes have all our databases */
406         for (j=0; j<nodemap->num; j++) {
407                 /* we dont need to ourself ourselves */
408                 if (nodemap->nodes[j].pnn == pnn) {
409                         continue;
410                 }
411                 /* dont check nodes that are unavailable */
412                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
413                         continue;
414                 }
415
416                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
417                                          mem_ctx, &remote_dbmap);
418                 if (ret != 0) {
419                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
420                         return -1;
421                 }
422
423                 /* step through all local databases */
424                 for (db=0; db<dbmap->num;db++) {
425                         const char *name;
426
427
428                         for (i=0;i<remote_dbmap->num;i++) {
429                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
430                                         break;
431                                 }
432                         }
433                         /* the remote node already have this database */
434                         if (i!=remote_dbmap->num) {
435                                 continue;
436                         }
437                         /* ok so we need to create this database */
438                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
439                                             mem_ctx, &name);
440                         if (ret != 0) {
441                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
442                                 return -1;
443                         }
444                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
445                                            mem_ctx, name, dbmap->dbs[db].persistent);
446                         if (ret != 0) {
447                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
448                                 return -1;
449                         }
450                 }
451         }
452
453         return 0;
454 }
455
456
457 /*
458   ensure we are attached to any databases that anyone else is attached to
459  */
460 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
461                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
462 {
463         int i, j, db, ret;
464         struct ctdb_dbid_map *remote_dbmap;
465
466         /* verify that we have all database any other node has */
467         for (j=0; j<nodemap->num; j++) {
468                 /* we dont need to ourself ourselves */
469                 if (nodemap->nodes[j].pnn == pnn) {
470                         continue;
471                 }
472                 /* dont check nodes that are unavailable */
473                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
474                         continue;
475                 }
476
477                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
478                                          mem_ctx, &remote_dbmap);
479                 if (ret != 0) {
480                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
481                         return -1;
482                 }
483
484                 /* step through all databases on the remote node */
485                 for (db=0; db<remote_dbmap->num;db++) {
486                         const char *name;
487
488                         for (i=0;i<(*dbmap)->num;i++) {
489                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
490                                         break;
491                                 }
492                         }
493                         /* we already have this db locally */
494                         if (i!=(*dbmap)->num) {
495                                 continue;
496                         }
497                         /* ok so we need to create this database and
498                            rebuild dbmap
499                          */
500                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
501                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
502                         if (ret != 0) {
503                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
504                                           nodemap->nodes[j].pnn));
505                                 return -1;
506                         }
507                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
508                                            remote_dbmap->dbs[db].persistent);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
511                                 return -1;
512                         }
513                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
514                         if (ret != 0) {
515                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
516                                 return -1;
517                         }
518                 }
519         }
520
521         return 0;
522 }
523
524
525 /*
526   pull the remote database contents from one node into the recdb
527  */
528 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
529                                     struct tdb_wrap *recdb, uint32_t dbid,
530                                     bool persistent)
531 {
532         int ret;
533         TDB_DATA outdata;
534         struct ctdb_marshall_buffer *reply;
535         struct ctdb_rec_data *rec;
536         int i;
537         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
538
539         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
540                                CONTROL_TIMEOUT(), &outdata);
541         if (ret != 0) {
542                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
543                 talloc_free(tmp_ctx);
544                 return -1;
545         }
546
547         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
548
549         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
550                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
551                 talloc_free(tmp_ctx);
552                 return -1;
553         }
554         
555         rec = (struct ctdb_rec_data *)&reply->data[0];
556         
557         for (i=0;
558              i<reply->count;
559              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
560                 TDB_DATA key, data;
561                 struct ctdb_ltdb_header *hdr;
562                 TDB_DATA existing;
563                 
564                 key.dptr = &rec->data[0];
565                 key.dsize = rec->keylen;
566                 data.dptr = &rec->data[key.dsize];
567                 data.dsize = rec->datalen;
568                 
569                 hdr = (struct ctdb_ltdb_header *)data.dptr;
570
571                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
572                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
573                         talloc_free(tmp_ctx);
574                         return -1;
575                 }
576
577                 /* fetch the existing record, if any */
578                 existing = tdb_fetch(recdb->tdb, key);
579                 
580                 if (existing.dptr != NULL) {
581                         struct ctdb_ltdb_header header;
582                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
583                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
584                                          (unsigned)existing.dsize, srcnode));
585                                 free(existing.dptr);
586                                 talloc_free(tmp_ctx);
587                                 return -1;
588                         }
589                         header = *(struct ctdb_ltdb_header *)existing.dptr;
590                         free(existing.dptr);
591                         if (!(header.rsn < hdr->rsn ||
592                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
593                                 continue;
594                         }
595                 }
596                 
597                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
598                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
599                         talloc_free(tmp_ctx);
600                         return -1;                              
601                 }
602         }
603
604         talloc_free(tmp_ctx);
605
606         return 0;
607 }
608
609 /*
610   pull all the remote database contents into the recdb
611  */
612 static int pull_remote_database(struct ctdb_context *ctdb,
613                                 struct ctdb_recoverd *rec, 
614                                 struct ctdb_node_map *nodemap, 
615                                 struct tdb_wrap *recdb, uint32_t dbid,
616                                 bool persistent)
617 {
618         int j;
619
620         /* pull all records from all other nodes across onto this node
621            (this merges based on rsn)
622         */
623         for (j=0; j<nodemap->num; j++) {
624                 /* dont merge from nodes that are unavailable */
625                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
626                         continue;
627                 }
628                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
629                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
630                                  nodemap->nodes[j].pnn));
631                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
632                         return -1;
633                 }
634         }
635         
636         return 0;
637 }
638
639
640 /*
641   update flags on all active nodes
642  */
643 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
644 {
645         int ret;
646
647         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
648                 if (ret != 0) {
649                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
650                 return -1;
651         }
652
653         return 0;
654 }
655
656 /*
657   ensure all nodes have the same vnnmap we do
658  */
659 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
660                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
661 {
662         int j, ret;
663
664         /* push the new vnn map out to all the nodes */
665         for (j=0; j<nodemap->num; j++) {
666                 /* dont push to nodes that are unavailable */
667                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
668                         continue;
669                 }
670
671                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
672                 if (ret != 0) {
673                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
674                         return -1;
675                 }
676         }
677
678         return 0;
679 }
680
681
682 struct vacuum_info {
683         struct vacuum_info *next, *prev;
684         struct ctdb_recoverd *rec;
685         uint32_t srcnode;
686         struct ctdb_db_context *ctdb_db;
687         struct ctdb_marshall_buffer *recs;
688         struct ctdb_rec_data *r;
689 };
690
691 static void vacuum_fetch_next(struct vacuum_info *v);
692
693 /*
694   called when a vacuum fetch has completed - just free it and do the next one
695  */
696 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
697 {
698         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
699         talloc_free(state);
700         vacuum_fetch_next(v);
701 }
702
703
704 /*
705   process the next element from the vacuum list
706 */
707 static void vacuum_fetch_next(struct vacuum_info *v)
708 {
709         struct ctdb_call call;
710         struct ctdb_rec_data *r;
711
712         while (v->recs->count) {
713                 struct ctdb_client_call_state *state;
714                 TDB_DATA data;
715                 struct ctdb_ltdb_header *hdr;
716
717                 ZERO_STRUCT(call);
718                 call.call_id = CTDB_NULL_FUNC;
719                 call.flags = CTDB_IMMEDIATE_MIGRATION;
720                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
721
722                 r = v->r;
723                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
724                 v->recs->count--;
725
726                 call.key.dptr = &r->data[0];
727                 call.key.dsize = r->keylen;
728
729                 /* ensure we don't block this daemon - just skip a record if we can't get
730                    the chainlock */
731                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
732                         continue;
733                 }
734
735                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
736                 if (data.dptr == NULL) {
737                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
738                         continue;
739                 }
740
741                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
742                         free(data.dptr);
743                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
744                         continue;
745                 }
746                 
747                 hdr = (struct ctdb_ltdb_header *)data.dptr;
748                 if (hdr->dmaster == v->rec->ctdb->pnn) {
749                         /* its already local */
750                         free(data.dptr);
751                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
752                         continue;
753                 }
754
755                 free(data.dptr);
756
757                 state = ctdb_call_send(v->ctdb_db, &call);
758                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
759                 if (state == NULL) {
760                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
761                         talloc_free(v);
762                         return;
763                 }
764                 state->async.fn = vacuum_fetch_callback;
765                 state->async.private_data = v;
766                 return;
767         }
768
769         talloc_free(v);
770 }
771
772
773 /*
774   destroy a vacuum info structure
775  */
776 static int vacuum_info_destructor(struct vacuum_info *v)
777 {
778         DLIST_REMOVE(v->rec->vacuum_info, v);
779         return 0;
780 }
781
782
783 /*
784   handler for vacuum fetch
785 */
786 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
787                                  TDB_DATA data, void *private_data)
788 {
789         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
790         struct ctdb_marshall_buffer *recs;
791         int ret, i;
792         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
793         const char *name;
794         struct ctdb_dbid_map *dbmap=NULL;
795         bool persistent = false;
796         struct ctdb_db_context *ctdb_db;
797         struct ctdb_rec_data *r;
798         uint32_t srcnode;
799         struct vacuum_info *v;
800
801         recs = (struct ctdb_marshall_buffer *)data.dptr;
802         r = (struct ctdb_rec_data *)&recs->data[0];
803
804         if (recs->count == 0) {
805                 talloc_free(tmp_ctx);
806                 return;
807         }
808
809         srcnode = r->reqid;
810
811         for (v=rec->vacuum_info;v;v=v->next) {
812                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
813                         /* we're already working on records from this node */
814                         talloc_free(tmp_ctx);
815                         return;
816                 }
817         }
818
819         /* work out if the database is persistent */
820         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
821         if (ret != 0) {
822                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
823                 talloc_free(tmp_ctx);
824                 return;
825         }
826
827         for (i=0;i<dbmap->num;i++) {
828                 if (dbmap->dbs[i].dbid == recs->db_id) {
829                         persistent = dbmap->dbs[i].persistent;
830                         break;
831                 }
832         }
833         if (i == dbmap->num) {
834                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
835                 talloc_free(tmp_ctx);
836                 return;         
837         }
838
839         /* find the name of this database */
840         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
841                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
842                 talloc_free(tmp_ctx);
843                 return;
844         }
845
846         /* attach to it */
847         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
848         if (ctdb_db == NULL) {
849                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
850                 talloc_free(tmp_ctx);
851                 return;
852         }
853
854         v = talloc_zero(rec, struct vacuum_info);
855         if (v == NULL) {
856                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
857                 talloc_free(tmp_ctx);
858                 return;
859         }
860
861         v->rec = rec;
862         v->srcnode = srcnode;
863         v->ctdb_db = ctdb_db;
864         v->recs = talloc_memdup(v, recs, data.dsize);
865         if (v->recs == NULL) {
866                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
867                 talloc_free(v);
868                 talloc_free(tmp_ctx);
869                 return;         
870         }
871         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
872
873         DLIST_ADD(rec->vacuum_info, v);
874
875         talloc_set_destructor(v, vacuum_info_destructor);
876
877         vacuum_fetch_next(v);
878         talloc_free(tmp_ctx);
879 }
880
881
882 /*
883   called when ctdb_wait_timeout should finish
884  */
885 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
886                               struct timeval yt, void *p)
887 {
888         uint32_t *timed_out = (uint32_t *)p;
889         (*timed_out) = 1;
890 }
891
892 /*
893   wait for a given number of seconds
894  */
895 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
896 {
897         uint32_t timed_out = 0;
898         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
899         while (!timed_out) {
900                 event_loop_once(ctdb->ev);
901         }
902 }
903
904 /*
905   called when an election times out (ends)
906  */
907 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
908                                   struct timeval t, void *p)
909 {
910         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
911         rec->election_timeout = NULL;
912
913         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
914 }
915
916
917 /*
918   wait for an election to finish. It finished election_timeout seconds after
919   the last election packet is received
920  */
921 static void ctdb_wait_election(struct ctdb_recoverd *rec)
922 {
923         struct ctdb_context *ctdb = rec->ctdb;
924         while (rec->election_timeout) {
925                 event_loop_once(ctdb->ev);
926         }
927 }
928
929 /*
930   Update our local flags from all remote connected nodes. 
931   This is only run when we are or we belive we are the recovery master
932  */
933 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
934 {
935         int j;
936         struct ctdb_context *ctdb = rec->ctdb;
937         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
938
939         /* get the nodemap for all active remote nodes and verify
940            they are the same as for this node
941          */
942         for (j=0; j<nodemap->num; j++) {
943                 struct ctdb_node_map *remote_nodemap=NULL;
944                 int ret;
945
946                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
947                         continue;
948                 }
949                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
950                         continue;
951                 }
952
953                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
954                                            mem_ctx, &remote_nodemap);
955                 if (ret != 0) {
956                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
957                                   nodemap->nodes[j].pnn));
958                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
959                         talloc_free(mem_ctx);
960                         return MONITOR_FAILED;
961                 }
962                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
963                         /* We should tell our daemon about this so it
964                            updates its flags or else we will log the same 
965                            message again in the next iteration of recovery.
966                            Since we are the recovery master we can just as
967                            well update the flags on all nodes.
968                         */
969                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
970                         if (ret != 0) {
971                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
972                                 return -1;
973                         }
974
975                         /* Update our local copy of the flags in the recovery
976                            daemon.
977                         */
978                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
979                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
980                                  nodemap->nodes[j].flags));
981                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
982                 }
983                 talloc_free(remote_nodemap);
984         }
985         talloc_free(mem_ctx);
986         return MONITOR_OK;
987 }
988
989
990 /* Create a new random generation ip. 
991    The generation id can not be the INVALID_GENERATION id
992 */
993 static uint32_t new_generation(void)
994 {
995         uint32_t generation;
996
997         while (1) {
998                 generation = random();
999
1000                 if (generation != INVALID_GENERATION) {
1001                         break;
1002                 }
1003         }
1004
1005         return generation;
1006 }
1007
1008
1009 /*
1010   create a temporary working database
1011  */
1012 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1013 {
1014         char *name;
1015         struct tdb_wrap *recdb;
1016         unsigned tdb_flags;
1017
1018         /* open up the temporary recovery database */
1019         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1020                                ctdb->db_directory_state,
1021                                ctdb->pnn);
1022         if (name == NULL) {
1023                 return NULL;
1024         }
1025         unlink(name);
1026
1027         tdb_flags = TDB_NOLOCK;
1028         if (ctdb->valgrinding) {
1029                 tdb_flags |= TDB_NOMMAP;
1030         }
1031         tdb_flags |= TDB_DISALLOW_NESTING;
1032
1033         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1034                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1035         if (recdb == NULL) {
1036                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1037         }
1038
1039         talloc_free(name);
1040
1041         return recdb;
1042 }
1043
1044
1045 /* 
1046    a traverse function for pulling all relevent records from recdb
1047  */
1048 struct recdb_data {
1049         struct ctdb_context *ctdb;
1050         struct ctdb_marshall_buffer *recdata;
1051         uint32_t len;
1052         bool failed;
1053         bool persistent;
1054 };
1055
1056 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1057 {
1058         struct recdb_data *params = (struct recdb_data *)p;
1059         struct ctdb_rec_data *rec;
1060         struct ctdb_ltdb_header *hdr;
1061
1062         /* skip empty records */
1063         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1064                 return 0;
1065         }
1066
1067         /* update the dmaster field to point to us */
1068         hdr = (struct ctdb_ltdb_header *)data.dptr;
1069         if (!params->persistent) {
1070                 hdr->dmaster = params->ctdb->pnn;
1071                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1072         }
1073
1074         /* add the record to the blob ready to send to the nodes */
1075         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1076         if (rec == NULL) {
1077                 params->failed = true;
1078                 return -1;
1079         }
1080         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1081         if (params->recdata == NULL) {
1082                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1083                          rec->length + params->len));
1084                 params->failed = true;
1085                 return -1;
1086         }
1087         params->recdata->count++;
1088         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1089         params->len += rec->length;
1090         talloc_free(rec);
1091
1092         return 0;
1093 }
1094
1095 /*
1096   push the recdb database out to all nodes
1097  */
1098 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1099                                bool persistent,
1100                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1101 {
1102         struct recdb_data params;
1103         struct ctdb_marshall_buffer *recdata;
1104         TDB_DATA outdata;
1105         TALLOC_CTX *tmp_ctx;
1106         uint32_t *nodes;
1107
1108         tmp_ctx = talloc_new(ctdb);
1109         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1110
1111         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1112         CTDB_NO_MEMORY(ctdb, recdata);
1113
1114         recdata->db_id = dbid;
1115
1116         params.ctdb = ctdb;
1117         params.recdata = recdata;
1118         params.len = offsetof(struct ctdb_marshall_buffer, data);
1119         params.failed = false;
1120         params.persistent = persistent;
1121
1122         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1123                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1124                 talloc_free(params.recdata);
1125                 talloc_free(tmp_ctx);
1126                 return -1;
1127         }
1128
1129         if (params.failed) {
1130                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1131                 talloc_free(params.recdata);
1132                 talloc_free(tmp_ctx);
1133                 return -1;              
1134         }
1135
1136         recdata = params.recdata;
1137
1138         outdata.dptr = (void *)recdata;
1139         outdata.dsize = params.len;
1140
1141         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1142         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1143                                         nodes, 0,
1144                                         CONTROL_TIMEOUT(), false, outdata,
1145                                         NULL, NULL,
1146                                         NULL) != 0) {
1147                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1148                 talloc_free(recdata);
1149                 talloc_free(tmp_ctx);
1150                 return -1;
1151         }
1152
1153         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1154                   dbid, recdata->count));
1155
1156         talloc_free(recdata);
1157         talloc_free(tmp_ctx);
1158
1159         return 0;
1160 }
1161
1162
1163 /*
1164   go through a full recovery on one database 
1165  */
1166 static int recover_database(struct ctdb_recoverd *rec, 
1167                             TALLOC_CTX *mem_ctx,
1168                             uint32_t dbid,
1169                             bool persistent,
1170                             uint32_t pnn, 
1171                             struct ctdb_node_map *nodemap,
1172                             uint32_t transaction_id)
1173 {
1174         struct tdb_wrap *recdb;
1175         int ret;
1176         struct ctdb_context *ctdb = rec->ctdb;
1177         TDB_DATA data;
1178         struct ctdb_control_wipe_database w;
1179         uint32_t *nodes;
1180
1181         recdb = create_recdb(ctdb, mem_ctx);
1182         if (recdb == NULL) {
1183                 return -1;
1184         }
1185
1186         /* pull all remote databases onto the recdb */
1187         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1188         if (ret != 0) {
1189                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1190                 return -1;
1191         }
1192
1193         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1194
1195         /* wipe all the remote databases. This is safe as we are in a transaction */
1196         w.db_id = dbid;
1197         w.transaction_id = transaction_id;
1198
1199         data.dptr = (void *)&w;
1200         data.dsize = sizeof(w);
1201
1202         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1204                                         nodes, 0,
1205                                         CONTROL_TIMEOUT(), false, data,
1206                                         NULL, NULL,
1207                                         NULL) != 0) {
1208                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1209                 talloc_free(recdb);
1210                 return -1;
1211         }
1212         
1213         /* push out the correct database. This sets the dmaster and skips 
1214            the empty records */
1215         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1216         if (ret != 0) {
1217                 talloc_free(recdb);
1218                 return -1;
1219         }
1220
1221         /* all done with this database */
1222         talloc_free(recdb);
1223
1224         return 0;
1225 }
1226
1227 /*
1228   reload the nodes file 
1229 */
1230 static void reload_nodes_file(struct ctdb_context *ctdb)
1231 {
1232         ctdb->nodes = NULL;
1233         ctdb_load_nodes_file(ctdb);
1234 }
1235
1236 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1237                                          struct ctdb_recoverd *rec,
1238                                          struct ctdb_node_map *nodemap,
1239                                          uint32_t *culprit)
1240 {
1241         int j;
1242         int ret;
1243
1244         if (ctdb->num_nodes != nodemap->num) {
1245                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1246                                   ctdb->num_nodes, nodemap->num));
1247                 if (culprit) {
1248                         *culprit = ctdb->pnn;
1249                 }
1250                 return -1;
1251         }
1252
1253         for (j=0; j<nodemap->num; j++) {
1254                 /* release any existing data */
1255                 if (ctdb->nodes[j]->known_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->known_public_ips);
1257                         ctdb->nodes[j]->known_public_ips = NULL;
1258                 }
1259                 if (ctdb->nodes[j]->available_public_ips) {
1260                         talloc_free(ctdb->nodes[j]->available_public_ips);
1261                         ctdb->nodes[j]->available_public_ips = NULL;
1262                 }
1263
1264                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1265                         continue;
1266                 }
1267
1268                 /* grab a new shiny list of public ips from the node */
1269                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1270                                         CONTROL_TIMEOUT(),
1271                                         ctdb->nodes[j]->pnn,
1272                                         ctdb->nodes,
1273                                         0,
1274                                         &ctdb->nodes[j]->known_public_ips);
1275                 if (ret != 0) {
1276                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1277                                 ctdb->nodes[j]->pnn));
1278                         if (culprit) {
1279                                 *culprit = ctdb->nodes[j]->pnn;
1280                         }
1281                         return -1;
1282                 }
1283
1284                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1285                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1286                         rec->need_takeover_run = true;
1287                 }
1288
1289                 /* grab a new shiny list of public ips from the node */
1290                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1291                                         CONTROL_TIMEOUT(),
1292                                         ctdb->nodes[j]->pnn,
1293                                         ctdb->nodes,
1294                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1295                                         &ctdb->nodes[j]->available_public_ips);
1296                 if (ret != 0) {
1297                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1298                                 ctdb->nodes[j]->pnn));
1299                         if (culprit) {
1300                                 *culprit = ctdb->nodes[j]->pnn;
1301                         }
1302                         return -1;
1303                 }
1304         }
1305
1306         return 0;
1307 }
1308
1309 /*
1310   we are the recmaster, and recovery is needed - start a recovery run
1311  */
1312 static int do_recovery(struct ctdb_recoverd *rec, 
1313                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1314                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1315 {
1316         struct ctdb_context *ctdb = rec->ctdb;
1317         int i, j, ret;
1318         uint32_t generation;
1319         struct ctdb_dbid_map *dbmap;
1320         TDB_DATA data;
1321         uint32_t *nodes;
1322         struct timeval start_time;
1323         uint32_t culprit = (uint32_t)-1;
1324
1325         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1326
1327         /* if recovery fails, force it again */
1328         rec->need_recovery = true;
1329
1330         for (i=0; i<ctdb->num_nodes; i++) {
1331                 struct ctdb_banning_state *ban_state;
1332
1333                 if (ctdb->nodes[i]->ban_state == NULL) {
1334                         continue;
1335                 }
1336                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1337                 if (ban_state->count < 2*ctdb->num_nodes) {
1338                         continue;
1339                 }
1340                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1341                         ctdb->nodes[i]->pnn, ban_state->count,
1342                         ctdb->tunable.recovery_ban_period));
1343                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1344                 ban_state->count = 0;
1345         }
1346
1347
1348         if (ctdb->tunable.verify_recovery_lock != 0) {
1349                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1350                 start_time = timeval_current();
1351                 if (!ctdb_recovery_lock(ctdb, true)) {
1352                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1353                                          "and ban ourself for %u seconds\n",
1354                                          ctdb->tunable.recovery_ban_period));
1355                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1356                         return -1;
1357                 }
1358                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1359                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1360         }
1361
1362         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1363
1364         /* get a list of all databases */
1365         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1366         if (ret != 0) {
1367                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1368                 return -1;
1369         }
1370
1371         /* we do the db creation before we set the recovery mode, so the freeze happens
1372            on all databases we will be dealing with. */
1373
1374         /* verify that we have all the databases any other node has */
1375         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1376         if (ret != 0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1378                 return -1;
1379         }
1380
1381         /* verify that all other nodes have all our databases */
1382         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1383         if (ret != 0) {
1384                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1385                 return -1;
1386         }
1387         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1388
1389         /* update the database priority for all remote databases */
1390         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1391         if (ret != 0) {
1392                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1393         }
1394         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1395
1396
1397         /* set recovery mode to active on all nodes */
1398         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1399         if (ret != 0) {
1400                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1401                 return -1;
1402         }
1403
1404         /* execute the "startrecovery" event script on all nodes */
1405         ret = run_startrecovery_eventscript(rec, nodemap);
1406         if (ret!=0) {
1407                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1408                 return -1;
1409         }
1410
1411         /*
1412           update all nodes to have the same flags that we have
1413          */
1414         for (i=0;i<nodemap->num;i++) {
1415                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1416                         continue;
1417                 }
1418
1419                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1420                 if (ret != 0) {
1421                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1422                         return -1;
1423                 }
1424         }
1425
1426         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1427
1428         /* pick a new generation number */
1429         generation = new_generation();
1430
1431         /* change the vnnmap on this node to use the new generation 
1432            number but not on any other nodes.
1433            this guarantees that if we abort the recovery prematurely
1434            for some reason (a node stops responding?)
1435            that we can just return immediately and we will reenter
1436            recovery shortly again.
1437            I.e. we deliberately leave the cluster with an inconsistent
1438            generation id to allow us to abort recovery at any stage and
1439            just restart it from scratch.
1440          */
1441         vnnmap->generation = generation;
1442         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1443         if (ret != 0) {
1444                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1445                 return -1;
1446         }
1447
1448         data.dptr = (void *)&generation;
1449         data.dsize = sizeof(uint32_t);
1450
1451         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1452         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1453                                         nodes, 0,
1454                                         CONTROL_TIMEOUT(), false, data,
1455                                         NULL,
1456                                         transaction_start_fail_callback,
1457                                         rec) != 0) {
1458                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1459                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1460                                         nodes, 0,
1461                                         CONTROL_TIMEOUT(), false, tdb_null,
1462                                         NULL,
1463                                         NULL,
1464                                         NULL) != 0) {
1465                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1466                 }
1467                 return -1;
1468         }
1469
1470         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1471
1472         for (i=0;i<dbmap->num;i++) {
1473                 ret = recover_database(rec, mem_ctx,
1474                                        dbmap->dbs[i].dbid,
1475                                        dbmap->dbs[i].persistent,
1476                                        pnn, nodemap, generation);
1477                 if (ret != 0) {
1478                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1479                         return -1;
1480                 }
1481         }
1482
1483         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1484
1485         /* commit all the changes */
1486         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1487                                         nodes, 0,
1488                                         CONTROL_TIMEOUT(), false, data,
1489                                         NULL, NULL,
1490                                         NULL) != 0) {
1491                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1492                 return -1;
1493         }
1494
1495         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1496         
1497
1498         /* update the capabilities for all nodes */
1499         ret = update_capabilities(ctdb, nodemap);
1500         if (ret!=0) {
1501                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1502                 return -1;
1503         }
1504
1505         /* build a new vnn map with all the currently active and
1506            unbanned nodes */
1507         generation = new_generation();
1508         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1509         CTDB_NO_MEMORY(ctdb, vnnmap);
1510         vnnmap->generation = generation;
1511         vnnmap->size = 0;
1512         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1513         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1514         for (i=j=0;i<nodemap->num;i++) {
1515                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1516                         continue;
1517                 }
1518                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1519                         /* this node can not be an lmaster */
1520                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1521                         continue;
1522                 }
1523
1524                 vnnmap->size++;
1525                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1526                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1527                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1528
1529         }
1530         if (vnnmap->size == 0) {
1531                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1532                 vnnmap->size++;
1533                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1534                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1535                 vnnmap->map[0] = pnn;
1536         }       
1537
1538         /* update to the new vnnmap on all nodes */
1539         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1540         if (ret != 0) {
1541                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1542                 return -1;
1543         }
1544
1545         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1546
1547         /* update recmaster to point to us for all nodes */
1548         ret = set_recovery_master(ctdb, nodemap, pnn);
1549         if (ret!=0) {
1550                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1551                 return -1;
1552         }
1553
1554         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1555
1556         /*
1557           update all nodes to have the same flags that we have
1558          */
1559         for (i=0;i<nodemap->num;i++) {
1560                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1561                         continue;
1562                 }
1563
1564                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1565                 if (ret != 0) {
1566                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1567                         return -1;
1568                 }
1569         }
1570
1571         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1572
1573         /* disable recovery mode */
1574         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1575         if (ret != 0) {
1576                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1577                 return -1;
1578         }
1579
1580         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1581
1582         /*
1583           tell nodes to takeover their public IPs
1584          */
1585         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1586         if (ret != 0) {
1587                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1588                                  culprit));
1589                 return -1;
1590         }
1591         rec->need_takeover_run = false;
1592         ret = ctdb_takeover_run(ctdb, nodemap);
1593         if (ret != 0) {
1594                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1595                 return -1;
1596         }
1597         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1598
1599         /* execute the "recovered" event script on all nodes */
1600         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1601         if (ret!=0) {
1602                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1603                 return -1;
1604         }
1605
1606         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1607
1608         /* send a message to all clients telling them that the cluster 
1609            has been reconfigured */
1610         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1611
1612         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1613
1614         rec->need_recovery = false;
1615
1616         /* we managed to complete a full recovery, make sure to forgive
1617            any past sins by the nodes that could now participate in the
1618            recovery.
1619         */
1620         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1621         for (i=0;i<nodemap->num;i++) {
1622                 struct ctdb_banning_state *ban_state;
1623
1624                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1625                         continue;
1626                 }
1627
1628                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1629                 if (ban_state == NULL) {
1630                         continue;
1631                 }
1632
1633                 ban_state->count = 0;
1634         }
1635
1636
1637         /* We just finished a recovery successfully. 
1638            We now wait for rerecovery_timeout before we allow 
1639            another recovery to take place.
1640         */
1641         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1642         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1643         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1644
1645         return 0;
1646 }
1647
1648
1649 /*
1650   elections are won by first checking the number of connected nodes, then
1651   the priority time, then the pnn
1652  */
1653 struct election_message {
1654         uint32_t num_connected;
1655         struct timeval priority_time;
1656         uint32_t pnn;
1657         uint32_t node_flags;
1658 };
1659
1660 /*
1661   form this nodes election data
1662  */
1663 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1664 {
1665         int ret, i;
1666         struct ctdb_node_map *nodemap;
1667         struct ctdb_context *ctdb = rec->ctdb;
1668
1669         ZERO_STRUCTP(em);
1670
1671         em->pnn = rec->ctdb->pnn;
1672         em->priority_time = rec->priority_time;
1673
1674         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1675         if (ret != 0) {
1676                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1677                 return;
1678         }
1679
1680         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1681         em->node_flags = rec->node_flags;
1682
1683         for (i=0;i<nodemap->num;i++) {
1684                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1685                         em->num_connected++;
1686                 }
1687         }
1688
1689         /* we shouldnt try to win this election if we cant be a recmaster */
1690         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1691                 em->num_connected = 0;
1692                 em->priority_time = timeval_current();
1693         }
1694
1695         talloc_free(nodemap);
1696 }
1697
1698 /*
1699   see if the given election data wins
1700  */
1701 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1702 {
1703         struct election_message myem;
1704         int cmp = 0;
1705
1706         ctdb_election_data(rec, &myem);
1707
1708         /* we cant win if we dont have the recmaster capability */
1709         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1710                 return false;
1711         }
1712
1713         /* we cant win if we are banned */
1714         if (rec->node_flags & NODE_FLAGS_BANNED) {
1715                 return false;
1716         }       
1717
1718         /* we cant win if we are stopped */
1719         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1720                 return false;
1721         }       
1722
1723         /* we will automatically win if the other node is banned */
1724         if (em->node_flags & NODE_FLAGS_BANNED) {
1725                 return true;
1726         }
1727
1728         /* we will automatically win if the other node is banned */
1729         if (em->node_flags & NODE_FLAGS_STOPPED) {
1730                 return true;
1731         }
1732
1733         /* try to use the most connected node */
1734         if (cmp == 0) {
1735                 cmp = (int)myem.num_connected - (int)em->num_connected;
1736         }
1737
1738         /* then the longest running node */
1739         if (cmp == 0) {
1740                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1741         }
1742
1743         if (cmp == 0) {
1744                 cmp = (int)myem.pnn - (int)em->pnn;
1745         }
1746
1747         return cmp > 0;
1748 }
1749
1750 /*
1751   send out an election request
1752  */
1753 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1754 {
1755         int ret;
1756         TDB_DATA election_data;
1757         struct election_message emsg;
1758         uint64_t srvid;
1759         struct ctdb_context *ctdb = rec->ctdb;
1760
1761         srvid = CTDB_SRVID_RECOVERY;
1762
1763         ctdb_election_data(rec, &emsg);
1764
1765         election_data.dsize = sizeof(struct election_message);
1766         election_data.dptr  = (unsigned char *)&emsg;
1767
1768
1769         /* send an election message to all active nodes */
1770         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1771         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1772
1773
1774         /* A new node that is already frozen has entered the cluster.
1775            The existing nodes are not frozen and dont need to be frozen
1776            until the election has ended and we start the actual recovery
1777         */
1778         if (update_recmaster == true) {
1779                 /* first we assume we will win the election and set 
1780                    recoverymaster to be ourself on the current node
1781                  */
1782                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1783                 if (ret != 0) {
1784                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1785                         return -1;
1786                 }
1787         }
1788
1789
1790         return 0;
1791 }
1792
1793 /*
1794   this function will unban all nodes in the cluster
1795 */
1796 static void unban_all_nodes(struct ctdb_context *ctdb)
1797 {
1798         int ret, i;
1799         struct ctdb_node_map *nodemap;
1800         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1801         
1802         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1803         if (ret != 0) {
1804                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1805                 return;
1806         }
1807
1808         for (i=0;i<nodemap->num;i++) {
1809                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1810                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1811                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1812                 }
1813         }
1814
1815         talloc_free(tmp_ctx);
1816 }
1817
1818
1819 /*
1820   we think we are winning the election - send a broadcast election request
1821  */
1822 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1823 {
1824         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1825         int ret;
1826
1827         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1828         if (ret != 0) {
1829                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1830         }
1831
1832         talloc_free(rec->send_election_te);
1833         rec->send_election_te = NULL;
1834 }
1835
1836 /*
1837   handler for memory dumps
1838 */
1839 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1840                              TDB_DATA data, void *private_data)
1841 {
1842         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1843         TDB_DATA *dump;
1844         int ret;
1845         struct rd_memdump_reply *rd;
1846
1847         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1848                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1849                 talloc_free(tmp_ctx);
1850                 return;
1851         }
1852         rd = (struct rd_memdump_reply *)data.dptr;
1853
1854         dump = talloc_zero(tmp_ctx, TDB_DATA);
1855         if (dump == NULL) {
1856                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1857                 talloc_free(tmp_ctx);
1858                 return;
1859         }
1860         ret = ctdb_dump_memory(ctdb, dump);
1861         if (ret != 0) {
1862                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1863                 talloc_free(tmp_ctx);
1864                 return;
1865         }
1866
1867 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1868
1869         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1870         if (ret != 0) {
1871                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1872                 talloc_free(tmp_ctx);
1873                 return;
1874         }
1875
1876         talloc_free(tmp_ctx);
1877 }
1878
1879 /*
1880   handler for reload_nodes
1881 */
1882 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1883                              TDB_DATA data, void *private_data)
1884 {
1885         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1886
1887         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1888
1889         reload_nodes_file(rec->ctdb);
1890 }
1891
1892
1893 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1894                               struct timeval yt, void *p)
1895 {
1896         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1897
1898         talloc_free(rec->ip_check_disable_ctx);
1899         rec->ip_check_disable_ctx = NULL;
1900 }
1901
1902
1903 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1904                              TDB_DATA data, void *private_data)
1905 {
1906         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1907         struct ctdb_public_ip *ip;
1908
1909         if (rec->recmaster != rec->ctdb->pnn) {
1910                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1911                 return;
1912         }
1913
1914         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1915                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1916                 return;
1917         }
1918
1919         ip = (struct ctdb_public_ip *)data.dptr;
1920
1921         update_ip_assignment_tree(rec->ctdb, ip);
1922 }
1923
1924
1925 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1926                              TDB_DATA data, void *private_data)
1927 {
1928         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1929         uint32_t timeout;
1930
1931         if (rec->ip_check_disable_ctx != NULL) {
1932                 talloc_free(rec->ip_check_disable_ctx);
1933                 rec->ip_check_disable_ctx = NULL;
1934         }
1935
1936         if (data.dsize != sizeof(uint32_t)) {
1937                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1938                                  "expexting %lu\n", (long unsigned)data.dsize,
1939                                  (long unsigned)sizeof(uint32_t)));
1940                 return;
1941         }
1942         if (data.dptr == NULL) {
1943                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1944                 return;
1945         }
1946
1947         timeout = *((uint32_t *)data.dptr);
1948         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1949
1950         rec->ip_check_disable_ctx = talloc_new(rec);
1951         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1952
1953         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1954 }
1955
1956
1957 /*
1958   handler for ip reallocate, just add it to the list of callers and 
1959   handle this later in the monitor_cluster loop so we do not recurse
1960   with other callers to takeover_run()
1961 */
1962 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1963                              TDB_DATA data, void *private_data)
1964 {
1965         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1966         struct ip_reallocate_list *caller;
1967
1968         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1969                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1970                 return;
1971         }
1972
1973         if (rec->ip_reallocate_ctx == NULL) {
1974                 rec->ip_reallocate_ctx = talloc_new(rec);
1975                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1976         }
1977
1978         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1979         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1980
1981         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1982         caller->next = rec->reallocate_callers;
1983         rec->reallocate_callers = caller;
1984
1985         return;
1986 }
1987
1988 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1989 {
1990         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1991         TDB_DATA result;
1992         int32_t ret;
1993         struct ip_reallocate_list *callers;
1994         uint32_t culprit;
1995
1996         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1997
1998         /* update the list of public ips that a node can handle for
1999            all connected nodes
2000         */
2001         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2002         if (ret != 0) {
2003                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2004                                  culprit));
2005                 rec->need_takeover_run = true;
2006         }
2007         if (ret == 0) {
2008                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2009                 if (ret != 0) {
2010                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2011                                          culprit));
2012                         rec->need_takeover_run = true;
2013                 }
2014         }
2015
2016         result.dsize = sizeof(int32_t);
2017         result.dptr  = (uint8_t *)&ret;
2018
2019         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2020
2021                 /* Someone that sent srvid==0 does not want a reply */
2022                 if (callers->rd->srvid == 0) {
2023                         continue;
2024                 }
2025                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2026                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2027                                   (unsigned long long)callers->rd->srvid));
2028                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2029                 if (ret != 0) {
2030                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2031                                          "message to %u:%llu\n",
2032                                          (unsigned)callers->rd->pnn,
2033                                          (unsigned long long)callers->rd->srvid));
2034                 }
2035         }
2036
2037         talloc_free(tmp_ctx);
2038         talloc_free(rec->ip_reallocate_ctx);
2039         rec->ip_reallocate_ctx = NULL;
2040         rec->reallocate_callers = NULL;
2041         
2042 }
2043
2044
2045 /*
2046   handler for recovery master elections
2047 */
2048 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2049                              TDB_DATA data, void *private_data)
2050 {
2051         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2052         int ret;
2053         struct election_message *em = (struct election_message *)data.dptr;
2054         TALLOC_CTX *mem_ctx;
2055
2056         /* we got an election packet - update the timeout for the election */
2057         talloc_free(rec->election_timeout);
2058         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2059                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2060                                                 ctdb_election_timeout, rec);
2061
2062         mem_ctx = talloc_new(ctdb);
2063
2064         /* someone called an election. check their election data
2065            and if we disagree and we would rather be the elected node, 
2066            send a new election message to all other nodes
2067          */
2068         if (ctdb_election_win(rec, em)) {
2069                 if (!rec->send_election_te) {
2070                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2071                                                                 timeval_current_ofs(0, 500000),
2072                                                                 election_send_request, rec);
2073                 }
2074                 talloc_free(mem_ctx);
2075                 /*unban_all_nodes(ctdb);*/
2076                 return;
2077         }
2078         
2079         /* we didn't win */
2080         talloc_free(rec->send_election_te);
2081         rec->send_election_te = NULL;
2082
2083         if (ctdb->tunable.verify_recovery_lock != 0) {
2084                 /* release the recmaster lock */
2085                 if (em->pnn != ctdb->pnn &&
2086                     ctdb->recovery_lock_fd != -1) {
2087                         close(ctdb->recovery_lock_fd);
2088                         ctdb->recovery_lock_fd = -1;
2089                         unban_all_nodes(ctdb);
2090                 }
2091         }
2092
2093         /* ok, let that guy become recmaster then */
2094         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2095         if (ret != 0) {
2096                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2097                 talloc_free(mem_ctx);
2098                 return;
2099         }
2100
2101         talloc_free(mem_ctx);
2102         return;
2103 }
2104
2105
2106 /*
2107   force the start of the election process
2108  */
2109 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2110                            struct ctdb_node_map *nodemap)
2111 {
2112         int ret;
2113         struct ctdb_context *ctdb = rec->ctdb;
2114
2115         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2116
2117         /* set all nodes to recovery mode to stop all internode traffic */
2118         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2119         if (ret != 0) {
2120                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2121                 return;
2122         }
2123
2124         talloc_free(rec->election_timeout);
2125         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2126                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2127                                                 ctdb_election_timeout, rec);
2128
2129         ret = send_election_request(rec, pnn, true);
2130         if (ret!=0) {
2131                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2132                 return;
2133         }
2134
2135         /* wait for a few seconds to collect all responses */
2136         ctdb_wait_election(rec);
2137 }
2138
2139
2140
2141 /*
2142   handler for when a node changes its flags
2143 */
2144 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2145                             TDB_DATA data, void *private_data)
2146 {
2147         int ret;
2148         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2149         struct ctdb_node_map *nodemap=NULL;
2150         TALLOC_CTX *tmp_ctx;
2151         int i;
2152         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2153         int disabled_flag_changed;
2154
2155         if (data.dsize != sizeof(*c)) {
2156                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2157                 return;
2158         }
2159
2160         tmp_ctx = talloc_new(ctdb);
2161         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2162
2163         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2164         if (ret != 0) {
2165                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2166                 talloc_free(tmp_ctx);
2167                 return;         
2168         }
2169
2170
2171         for (i=0;i<nodemap->num;i++) {
2172                 if (nodemap->nodes[i].pnn == c->pnn) break;
2173         }
2174
2175         if (i == nodemap->num) {
2176                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2177                 talloc_free(tmp_ctx);
2178                 return;
2179         }
2180
2181         if (nodemap->nodes[i].flags != c->new_flags) {
2182                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2183         }
2184
2185         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2186
2187         nodemap->nodes[i].flags = c->new_flags;
2188
2189         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2190                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2191
2192         if (ret == 0) {
2193                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2194                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2195         }
2196         
2197         if (ret == 0 &&
2198             ctdb->recovery_master == ctdb->pnn &&
2199             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2200                 /* Only do the takeover run if the perm disabled or unhealthy
2201                    flags changed since these will cause an ip failover but not
2202                    a recovery.
2203                    If the node became disconnected or banned this will also
2204                    lead to an ip address failover but that is handled 
2205                    during recovery
2206                 */
2207                 if (disabled_flag_changed) {
2208                         rec->need_takeover_run = true;
2209                 }
2210         }
2211
2212         talloc_free(tmp_ctx);
2213 }
2214
2215 /*
2216   handler for when we need to push out flag changes ot all other nodes
2217 */
2218 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2219                             TDB_DATA data, void *private_data)
2220 {
2221         int ret;
2222         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2223         struct ctdb_node_map *nodemap=NULL;
2224         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2225         uint32_t recmaster;
2226         uint32_t *nodes;
2227
2228         /* find the recovery master */
2229         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2230         if (ret != 0) {
2231                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2232                 talloc_free(tmp_ctx);
2233                 return;
2234         }
2235
2236         /* read the node flags from the recmaster */
2237         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2238         if (ret != 0) {
2239                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2240                 talloc_free(tmp_ctx);
2241                 return;
2242         }
2243         if (c->pnn >= nodemap->num) {
2244                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2245                 talloc_free(tmp_ctx);
2246                 return;
2247         }
2248
2249         /* send the flags update to all connected nodes */
2250         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2251
2252         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2253                                       nodes, 0, CONTROL_TIMEOUT(),
2254                                       false, data,
2255                                       NULL, NULL,
2256                                       NULL) != 0) {
2257                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2258
2259                 talloc_free(tmp_ctx);
2260                 return;
2261         }
2262
2263         talloc_free(tmp_ctx);
2264 }
2265
2266
2267 struct verify_recmode_normal_data {
2268         uint32_t count;
2269         enum monitor_result status;
2270 };
2271
2272 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2273 {
2274         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2275
2276
2277         /* one more node has responded with recmode data*/
2278         rmdata->count--;
2279
2280         /* if we failed to get the recmode, then return an error and let
2281            the main loop try again.
2282         */
2283         if (state->state != CTDB_CONTROL_DONE) {
2284                 if (rmdata->status == MONITOR_OK) {
2285                         rmdata->status = MONITOR_FAILED;
2286                 }
2287                 return;
2288         }
2289
2290         /* if we got a response, then the recmode will be stored in the
2291            status field
2292         */
2293         if (state->status != CTDB_RECOVERY_NORMAL) {
2294                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2295                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2296         }
2297
2298         return;
2299 }
2300
2301
2302 /* verify that all nodes are in normal recovery mode */
2303 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2304 {
2305         struct verify_recmode_normal_data *rmdata;
2306         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2307         struct ctdb_client_control_state *state;
2308         enum monitor_result status;
2309         int j;
2310         
2311         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2312         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2313         rmdata->count  = 0;
2314         rmdata->status = MONITOR_OK;
2315
2316         /* loop over all active nodes and send an async getrecmode call to 
2317            them*/
2318         for (j=0; j<nodemap->num; j++) {
2319                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2320                         continue;
2321                 }
2322                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2323                                         CONTROL_TIMEOUT(), 
2324                                         nodemap->nodes[j].pnn);
2325                 if (state == NULL) {
2326                         /* we failed to send the control, treat this as 
2327                            an error and try again next iteration
2328                         */                      
2329                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2330                         talloc_free(mem_ctx);
2331                         return MONITOR_FAILED;
2332                 }
2333
2334                 /* set up the callback functions */
2335                 state->async.fn = verify_recmode_normal_callback;
2336                 state->async.private_data = rmdata;
2337
2338                 /* one more control to wait for to complete */
2339                 rmdata->count++;
2340         }
2341
2342
2343         /* now wait for up to the maximum number of seconds allowed
2344            or until all nodes we expect a response from has replied
2345         */
2346         while (rmdata->count > 0) {
2347                 event_loop_once(ctdb->ev);
2348         }
2349
2350         status = rmdata->status;
2351         talloc_free(mem_ctx);
2352         return status;
2353 }
2354
2355
2356 struct verify_recmaster_data {
2357         struct ctdb_recoverd *rec;
2358         uint32_t count;
2359         uint32_t pnn;
2360         enum monitor_result status;
2361 };
2362
2363 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2364 {
2365         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2366
2367
2368         /* one more node has responded with recmaster data*/
2369         rmdata->count--;
2370
2371         /* if we failed to get the recmaster, then return an error and let
2372            the main loop try again.
2373         */
2374         if (state->state != CTDB_CONTROL_DONE) {
2375                 if (rmdata->status == MONITOR_OK) {
2376                         rmdata->status = MONITOR_FAILED;
2377                 }
2378                 return;
2379         }
2380
2381         /* if we got a response, then the recmaster will be stored in the
2382            status field
2383         */
2384         if (state->status != rmdata->pnn) {
2385                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2386                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2387                 rmdata->status = MONITOR_ELECTION_NEEDED;
2388         }
2389
2390         return;
2391 }
2392
2393
2394 /* verify that all nodes agree that we are the recmaster */
2395 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2396 {
2397         struct ctdb_context *ctdb = rec->ctdb;
2398         struct verify_recmaster_data *rmdata;
2399         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2400         struct ctdb_client_control_state *state;
2401         enum monitor_result status;
2402         int j;
2403         
2404         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2405         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2406         rmdata->rec    = rec;
2407         rmdata->count  = 0;
2408         rmdata->pnn    = pnn;
2409         rmdata->status = MONITOR_OK;
2410
2411         /* loop over all active nodes and send an async getrecmaster call to 
2412            them*/
2413         for (j=0; j<nodemap->num; j++) {
2414                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2415                         continue;
2416                 }
2417                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2418                                         CONTROL_TIMEOUT(),
2419                                         nodemap->nodes[j].pnn);
2420                 if (state == NULL) {
2421                         /* we failed to send the control, treat this as 
2422                            an error and try again next iteration
2423                         */                      
2424                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2425                         talloc_free(mem_ctx);
2426                         return MONITOR_FAILED;
2427                 }
2428
2429                 /* set up the callback functions */
2430                 state->async.fn = verify_recmaster_callback;
2431                 state->async.private_data = rmdata;
2432
2433                 /* one more control to wait for to complete */
2434                 rmdata->count++;
2435         }
2436
2437
2438         /* now wait for up to the maximum number of seconds allowed
2439            or until all nodes we expect a response from has replied
2440         */
2441         while (rmdata->count > 0) {
2442                 event_loop_once(ctdb->ev);
2443         }
2444
2445         status = rmdata->status;
2446         talloc_free(mem_ctx);
2447         return status;
2448 }
2449
2450
2451 /* called to check that the local allocation of public ip addresses is ok.
2452 */
2453 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2454 {
2455         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2456         struct ctdb_control_get_ifaces *ifaces = NULL;
2457         struct ctdb_all_public_ips *ips = NULL;
2458         struct ctdb_uptime *uptime1 = NULL;
2459         struct ctdb_uptime *uptime2 = NULL;
2460         int ret, j;
2461         bool need_iface_check = false;
2462         bool need_takeover_run = false;
2463
2464         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2465                                 CTDB_CURRENT_NODE, &uptime1);
2466         if (ret != 0) {
2467                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2468                 talloc_free(mem_ctx);
2469                 return -1;
2470         }
2471
2472
2473         /* read the interfaces from the local node */
2474         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2475         if (ret != 0) {
2476                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2477                 talloc_free(mem_ctx);
2478                 return -1;
2479         }
2480
2481         if (!rec->ifaces) {
2482                 need_iface_check = true;
2483         } else if (rec->ifaces->num != ifaces->num) {
2484                 need_iface_check = true;
2485         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2486                 need_iface_check = true;
2487         }
2488
2489         if (need_iface_check) {
2490                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2491                                      "local node %u - force takeover run\n",
2492                                      pnn));
2493                 need_takeover_run = true;
2494         }
2495
2496         /* read the ip allocation from the local node */
2497         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2498         if (ret != 0) {
2499                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2500                 talloc_free(mem_ctx);
2501                 return -1;
2502         }
2503
2504         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2505                                 CTDB_CURRENT_NODE, &uptime2);
2506         if (ret != 0) {
2507                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2508                 talloc_free(mem_ctx);
2509                 return -1;
2510         }
2511
2512         /* skip the check if the startrecovery time has changed */
2513         if (timeval_compare(&uptime1->last_recovery_started,
2514                             &uptime2->last_recovery_started) != 0) {
2515                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2516                 talloc_free(mem_ctx);
2517                 return 0;
2518         }
2519
2520         /* skip the check if the endrecovery time has changed */
2521         if (timeval_compare(&uptime1->last_recovery_finished,
2522                             &uptime2->last_recovery_finished) != 0) {
2523                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2524                 talloc_free(mem_ctx);
2525                 return 0;
2526         }
2527
2528         /* skip the check if we have started but not finished recovery */
2529         if (timeval_compare(&uptime1->last_recovery_finished,
2530                             &uptime1->last_recovery_started) != 1) {
2531                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2532                 talloc_free(mem_ctx);
2533
2534                 return 0;
2535         }
2536
2537         talloc_free(rec->ifaces);
2538         rec->ifaces = talloc_steal(rec, ifaces);
2539
2540         /* verify that we have the ip addresses we should have
2541            and we dont have ones we shouldnt have.
2542            if we find an inconsistency we set recmode to
2543            active on the local node and wait for the recmaster
2544            to do a full blown recovery
2545         */
2546         for (j=0; j<ips->num; j++) {
2547                 if (ips->ips[j].pnn == pnn) {
2548                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2549                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2550                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2551                                 need_takeover_run = true;
2552                         }
2553                 } else {
2554                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2555                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2556                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2557                                 need_takeover_run = true;
2558                         }
2559                 }
2560         }
2561
2562         if (need_takeover_run) {
2563                 struct takeover_run_reply rd;
2564                 TDB_DATA data;
2565
2566                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2567
2568                 rd.pnn = ctdb->pnn;
2569                 rd.srvid = 0;
2570                 data.dptr = (uint8_t *)&rd;
2571                 data.dsize = sizeof(rd);
2572
2573                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2574                 if (ret != 0) {
2575                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2576                 }
2577         }
2578         talloc_free(mem_ctx);
2579         return 0;
2580 }
2581
2582
2583 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2584 {
2585         struct ctdb_node_map **remote_nodemaps = callback_data;
2586
2587         if (node_pnn >= ctdb->num_nodes) {
2588                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2589                 return;
2590         }
2591
2592         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2593
2594 }
2595
2596 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2597         struct ctdb_node_map *nodemap,
2598         struct ctdb_node_map **remote_nodemaps)
2599 {
2600         uint32_t *nodes;
2601
2602         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2603         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2604                                         nodes, 0,
2605                                         CONTROL_TIMEOUT(), false, tdb_null,
2606                                         async_getnodemap_callback,
2607                                         NULL,
2608                                         remote_nodemaps) != 0) {
2609                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2610
2611                 return -1;
2612         }
2613
2614         return 0;
2615 }
2616
2617 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2618 struct ctdb_check_reclock_state {
2619         struct ctdb_context *ctdb;
2620         struct timeval start_time;
2621         int fd[2];
2622         pid_t child;
2623         struct timed_event *te;
2624         struct fd_event *fde;
2625         enum reclock_child_status status;
2626 };
2627
2628 /* when we free the reclock state we must kill any child process.
2629 */
2630 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2631 {
2632         struct ctdb_context *ctdb = state->ctdb;
2633
2634         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2635
2636         if (state->fd[0] != -1) {
2637                 close(state->fd[0]);
2638                 state->fd[0] = -1;
2639         }
2640         if (state->fd[1] != -1) {
2641                 close(state->fd[1]);
2642                 state->fd[1] = -1;
2643         }
2644         kill(state->child, SIGKILL);
2645         return 0;
2646 }
2647
2648 /*
2649   called if our check_reclock child times out. this would happen if
2650   i/o to the reclock file blocks.
2651  */
2652 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2653                                          struct timeval t, void *private_data)
2654 {
2655         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2656                                            struct ctdb_check_reclock_state);
2657
2658         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2659         state->status = RECLOCK_TIMEOUT;
2660 }
2661
2662 /* this is called when the child process has completed checking the reclock
2663    file and has written data back to us through the pipe.
2664 */
2665 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2666                              uint16_t flags, void *private_data)
2667 {
2668         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2669                                              struct ctdb_check_reclock_state);
2670         char c = 0;
2671         int ret;
2672
2673         /* we got a response from our child process so we can abort the
2674            timeout.
2675         */
2676         talloc_free(state->te);
2677         state->te = NULL;
2678
2679         ret = read(state->fd[0], &c, 1);
2680         if (ret != 1 || c != RECLOCK_OK) {
2681                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2682                 state->status = RECLOCK_FAILED;
2683
2684                 return;
2685         }
2686
2687         state->status = RECLOCK_OK;
2688         return;
2689 }
2690
2691 static int check_recovery_lock(struct ctdb_context *ctdb)
2692 {
2693         int ret;
2694         struct ctdb_check_reclock_state *state;
2695         pid_t parent = getpid();
2696
2697         if (ctdb->recovery_lock_fd == -1) {
2698                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2699                 return -1;
2700         }
2701
2702         state = talloc(ctdb, struct ctdb_check_reclock_state);
2703         CTDB_NO_MEMORY(ctdb, state);
2704
2705         state->ctdb = ctdb;
2706         state->start_time = timeval_current();
2707         state->status = RECLOCK_CHECKING;
2708         state->fd[0] = -1;
2709         state->fd[1] = -1;
2710
2711         ret = pipe(state->fd);
2712         if (ret != 0) {
2713                 talloc_free(state);
2714                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2715                 return -1;
2716         }
2717
2718         state->child = fork();
2719         if (state->child == (pid_t)-1) {
2720                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2721                 close(state->fd[0]);
2722                 state->fd[0] = -1;
2723                 close(state->fd[1]);
2724                 state->fd[1] = -1;
2725                 talloc_free(state);
2726                 return -1;
2727         }
2728
2729         if (state->child == 0) {
2730                 char cc = RECLOCK_OK;
2731                 close(state->fd[0]);
2732                 state->fd[0] = -1;
2733
2734                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2735                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2736                         cc = RECLOCK_FAILED;
2737                 }
2738
2739                 write(state->fd[1], &cc, 1);
2740                 /* make sure we die when our parent dies */
2741                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2742                         sleep(5);
2743                         write(state->fd[1], &cc, 1);
2744                 }
2745                 _exit(0);
2746         }
2747         close(state->fd[1]);
2748         state->fd[1] = -1;
2749         set_close_on_exec(state->fd[0]);
2750
2751         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2752
2753         talloc_set_destructor(state, check_reclock_destructor);
2754
2755         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2756                                     ctdb_check_reclock_timeout, state);
2757         if (state->te == NULL) {
2758                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2759                 talloc_free(state);
2760                 return -1;
2761         }
2762
2763         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2764                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2765                                 reclock_child_handler,
2766                                 (void *)state);
2767
2768         if (state->fde == NULL) {
2769                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2770                 talloc_free(state);
2771                 return -1;
2772         }
2773
2774         while (state->status == RECLOCK_CHECKING) {
2775                 event_loop_once(ctdb->ev);
2776         }
2777
2778         if (state->status == RECLOCK_FAILED) {
2779                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2780                 close(ctdb->recovery_lock_fd);
2781                 ctdb->recovery_lock_fd = -1;
2782                 talloc_free(state);
2783                 return -1;
2784         }
2785
2786         talloc_free(state);
2787         return 0;
2788 }
2789
2790 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2791 {
2792         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2793         const char *reclockfile;
2794
2795         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2796                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2797                 talloc_free(tmp_ctx);
2798                 return -1;      
2799         }
2800
2801         if (reclockfile == NULL) {
2802                 if (ctdb->recovery_lock_file != NULL) {
2803                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2804                         talloc_free(ctdb->recovery_lock_file);
2805                         ctdb->recovery_lock_file = NULL;
2806                         if (ctdb->recovery_lock_fd != -1) {
2807                                 close(ctdb->recovery_lock_fd);
2808                                 ctdb->recovery_lock_fd = -1;
2809                         }
2810                 }
2811                 ctdb->tunable.verify_recovery_lock = 0;
2812                 talloc_free(tmp_ctx);
2813                 return 0;
2814         }
2815
2816         if (ctdb->recovery_lock_file == NULL) {
2817                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2818                 if (ctdb->recovery_lock_fd != -1) {
2819                         close(ctdb->recovery_lock_fd);
2820                         ctdb->recovery_lock_fd = -1;
2821                 }
2822                 talloc_free(tmp_ctx);
2823                 return 0;
2824         }
2825
2826
2827         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2828                 talloc_free(tmp_ctx);
2829                 return 0;
2830         }
2831
2832         talloc_free(ctdb->recovery_lock_file);
2833         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2834         ctdb->tunable.verify_recovery_lock = 0;
2835         if (ctdb->recovery_lock_fd != -1) {
2836                 close(ctdb->recovery_lock_fd);
2837                 ctdb->recovery_lock_fd = -1;
2838         }
2839
2840         talloc_free(tmp_ctx);
2841         return 0;
2842 }
2843
2844 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2845                       TALLOC_CTX *mem_ctx)
2846 {
2847         uint32_t pnn;
2848         struct ctdb_node_map *nodemap=NULL;
2849         struct ctdb_node_map *recmaster_nodemap=NULL;
2850         struct ctdb_node_map **remote_nodemaps=NULL;
2851         struct ctdb_vnn_map *vnnmap=NULL;
2852         struct ctdb_vnn_map *remote_vnnmap=NULL;
2853         int32_t debug_level;
2854         int i, j, ret;
2855
2856
2857
2858         /* verify that the main daemon is still running */
2859         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2860                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2861                 exit(-1);
2862         }
2863
2864         /* ping the local daemon to tell it we are alive */
2865         ctdb_ctrl_recd_ping(ctdb);
2866
2867         if (rec->election_timeout) {
2868                 /* an election is in progress */
2869                 return;
2870         }
2871
2872         /* read the debug level from the parent and update locally */
2873         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2874         if (ret !=0) {
2875                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2876                 return;
2877         }
2878         LogLevel = debug_level;
2879
2880
2881         /* We must check if we need to ban a node here but we want to do this
2882            as early as possible so we dont wait until we have pulled the node
2883            map from the local node. thats why we have the hardcoded value 20
2884         */
2885         for (i=0; i<ctdb->num_nodes; i++) {
2886                 struct ctdb_banning_state *ban_state;
2887
2888                 if (ctdb->nodes[i]->ban_state == NULL) {
2889                         continue;
2890                 }
2891                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2892                 if (ban_state->count < 20) {
2893                         continue;
2894                 }
2895                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2896                         ctdb->nodes[i]->pnn, ban_state->count,
2897                         ctdb->tunable.recovery_ban_period));
2898                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2899                 ban_state->count = 0;
2900         }
2901
2902         /* get relevant tunables */
2903         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2904         if (ret != 0) {
2905                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2906                 return;
2907         }
2908
2909         /* get the current recovery lock file from the server */
2910         if (update_recovery_lock_file(ctdb) != 0) {
2911                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2912                 return;
2913         }
2914
2915         /* Make sure that if recovery lock verification becomes disabled when
2916            we close the file
2917         */
2918         if (ctdb->tunable.verify_recovery_lock == 0) {
2919                 if (ctdb->recovery_lock_fd != -1) {
2920                         close(ctdb->recovery_lock_fd);
2921                         ctdb->recovery_lock_fd = -1;
2922                 }
2923         }
2924
2925         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2926         if (pnn == (uint32_t)-1) {
2927                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2928                 return;
2929         }
2930
2931         /* get the vnnmap */
2932         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2933         if (ret != 0) {
2934                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2935                 return;
2936         }
2937
2938
2939         /* get number of nodes */
2940         if (rec->nodemap) {
2941                 talloc_free(rec->nodemap);
2942                 rec->nodemap = NULL;
2943                 nodemap=NULL;
2944         }
2945         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2946         if (ret != 0) {
2947                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2948                 return;
2949         }
2950         nodemap = rec->nodemap;
2951
2952         /* remember our own node flags */
2953         rec->node_flags = nodemap->nodes[pnn].flags;
2954
2955         /* update the capabilities for all nodes */
2956         ret = update_capabilities(ctdb, nodemap);
2957         if (ret != 0) {
2958                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2959                 return;
2960         }
2961
2962         /* check which node is the recovery master */
2963         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2964         if (ret != 0) {
2965                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2966                 return;
2967         }
2968
2969         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2970         if (rec->recmaster != pnn) {
2971                 if (rec->ip_reallocate_ctx != NULL) {
2972                         talloc_free(rec->ip_reallocate_ctx);
2973                         rec->ip_reallocate_ctx = NULL;
2974                         rec->reallocate_callers = NULL;
2975                 }
2976         }
2977
2978         if (rec->recmaster == (uint32_t)-1) {
2979                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2980                 force_election(rec, pnn, nodemap);
2981                 return;
2982         }
2983
2984         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2985            also frozen and thet the recmode is set to active.
2986         */
2987         if (nodemap->nodes[pnn].flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2988                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2989                 if (ret != 0) {
2990                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2991                 }
2992                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2993                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
2994
2995                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2996                         if (ret != 0) {
2997                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
2998                                 return;
2999                         }
3000                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3001                         if (ret != 0) {
3002                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3003
3004                                 return;
3005                         }
3006                         return;
3007                 }
3008         }
3009         /* If the local node is stopped, verify we are not the recmaster 
3010            and yield this role if so
3011         */
3012         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
3013                 DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
3014                 force_election(rec, pnn, nodemap);
3015                 return;
3016         }
3017         
3018         /*
3019          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3020          * but we have force an election and try to become the new
3021          * recmaster
3022          */
3023         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3024             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3025              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3026                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3027                                   " but we (node %u) have - force an election\n",
3028                                   rec->recmaster, pnn));
3029                 force_election(rec, pnn, nodemap);
3030                 return;
3031         }
3032
3033         /* count how many active nodes there are */
3034         rec->num_active    = 0;
3035         rec->num_connected = 0;
3036         for (i=0; i<nodemap->num; i++) {
3037                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3038                         rec->num_active++;
3039                 }
3040                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3041                         rec->num_connected++;
3042                 }
3043         }
3044
3045
3046         /* verify that the recmaster node is still active */
3047         for (j=0; j<nodemap->num; j++) {
3048                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3049                         break;
3050                 }
3051         }
3052
3053         if (j == nodemap->num) {
3054                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3055                 force_election(rec, pnn, nodemap);
3056                 return;
3057         }
3058
3059         /* if recovery master is disconnected we must elect a new recmaster */
3060         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3061                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3062                 force_election(rec, pnn, nodemap);
3063                 return;
3064         }
3065
3066         /* grap the nodemap from the recovery master to check if it is banned */
3067         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3068                                    mem_ctx, &recmaster_nodemap);
3069         if (ret != 0) {
3070                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3071                           nodemap->nodes[j].pnn));
3072                 return;
3073         }
3074
3075
3076         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3077                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3078                 force_election(rec, pnn, nodemap);
3079                 return;
3080         }
3081
3082
3083         /* verify that we have all ip addresses we should have and we dont
3084          * have addresses we shouldnt have.
3085          */ 
3086         if (ctdb->do_checkpublicip) {
3087                 if (rec->ip_check_disable_ctx == NULL) {
3088                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3089                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3090                         }
3091                 }
3092         }
3093
3094
3095         /* if we are not the recmaster then we do not need to check
3096            if recovery is needed
3097          */
3098         if (pnn != rec->recmaster) {
3099                 return;
3100         }
3101
3102
3103         /* ensure our local copies of flags are right */
3104         ret = update_local_flags(rec, nodemap);
3105         if (ret == MONITOR_ELECTION_NEEDED) {
3106                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3107                 force_election(rec, pnn, nodemap);
3108                 return;
3109         }
3110         if (ret != MONITOR_OK) {
3111                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3112                 return;
3113         }
3114
3115         if (ctdb->num_nodes != nodemap->num) {
3116                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3117                 reload_nodes_file(ctdb);
3118                 return;
3119         }
3120
3121         /* verify that all active nodes agree that we are the recmaster */
3122         switch (verify_recmaster(rec, nodemap, pnn)) {
3123         case MONITOR_RECOVERY_NEEDED:
3124                 /* can not happen */
3125                 return;
3126         case MONITOR_ELECTION_NEEDED:
3127                 force_election(rec, pnn, nodemap);
3128                 return;
3129         case MONITOR_OK:
3130                 break;
3131         case MONITOR_FAILED:
3132                 return;
3133         }
3134
3135
3136         if (rec->need_recovery) {
3137                 /* a previous recovery didn't finish */
3138                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3139                 return;
3140         }
3141
3142         /* verify that all active nodes are in normal mode 
3143            and not in recovery mode 
3144         */
3145         switch (verify_recmode(ctdb, nodemap)) {
3146         case MONITOR_RECOVERY_NEEDED:
3147                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3148                 return;
3149         case MONITOR_FAILED:
3150                 return;
3151         case MONITOR_ELECTION_NEEDED:
3152                 /* can not happen */
3153         case MONITOR_OK:
3154                 break;
3155         }
3156
3157
3158         if (ctdb->tunable.verify_recovery_lock != 0) {
3159                 /* we should have the reclock - check its not stale */
3160                 ret = check_recovery_lock(ctdb);
3161                 if (ret != 0) {
3162                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3163                         ctdb_set_culprit(rec, ctdb->pnn);
3164                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3165                         return;
3166                 }
3167         }
3168
3169         /* if there are takeovers requested, perform it and notify the waiters */
3170         if (rec->reallocate_callers) {
3171                 process_ipreallocate_requests(ctdb, rec);
3172         }
3173
3174         /* get the nodemap for all active remote nodes
3175          */
3176         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3177         if (remote_nodemaps == NULL) {
3178                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3179                 return;
3180         }
3181         for(i=0; i<nodemap->num; i++) {
3182                 remote_nodemaps[i] = NULL;
3183         }
3184         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3185                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3186                 return;
3187         } 
3188
3189         /* verify that all other nodes have the same nodemap as we have
3190         */
3191         for (j=0; j<nodemap->num; j++) {
3192                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3193                         continue;
3194                 }
3195
3196                 if (remote_nodemaps[j] == NULL) {
3197                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3198                         ctdb_set_culprit(rec, j);
3199
3200                         return;
3201                 }
3202
3203                 /* if the nodes disagree on how many nodes there are
3204                    then this is a good reason to try recovery
3205                  */
3206                 if (remote_nodemaps[j]->num != nodemap->num) {
3207                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3208                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3209                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3210                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3211                         return;
3212                 }
3213
3214                 /* if the nodes disagree on which nodes exist and are
3215                    active, then that is also a good reason to do recovery
3216                  */
3217                 for (i=0;i<nodemap->num;i++) {
3218                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3219                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3220                                           nodemap->nodes[j].pnn, i, 
3221                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3222                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3223                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3224                                             vnnmap);
3225                                 return;
3226                         }
3227                 }
3228
3229                 /* verify the flags are consistent
3230                 */
3231                 for (i=0; i<nodemap->num; i++) {
3232                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3233                                 continue;
3234                         }
3235                         
3236                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3237                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3238                                   nodemap->nodes[j].pnn, 
3239                                   nodemap->nodes[i].pnn, 
3240                                   remote_nodemaps[j]->nodes[i].flags,
3241                                   nodemap->nodes[j].flags));
3242                                 if (i == j) {
3243                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3244                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3245                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3246                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3247                                                     vnnmap);
3248                                         return;
3249                                 } else {
3250                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3251                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3252                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3253                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3254                                                     vnnmap);
3255                                         return;
3256                                 }
3257                         }
3258                 }
3259         }
3260
3261
3262         /* there better be the same number of lmasters in the vnn map
3263            as there are active nodes or we will have to do a recovery
3264          */
3265         if (vnnmap->size != rec->num_active) {
3266                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3267                           vnnmap->size, rec->num_active));
3268                 ctdb_set_culprit(rec, ctdb->pnn);
3269                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3270                 return;
3271         }
3272
3273         /* verify that all active nodes in the nodemap also exist in 
3274            the vnnmap.
3275          */
3276         for (j=0; j<nodemap->num; j++) {
3277                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3278                         continue;
3279                 }
3280                 if (nodemap->nodes[j].pnn == pnn) {
3281                         continue;
3282                 }
3283
3284                 for (i=0; i<vnnmap->size; i++) {
3285                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3286                                 break;
3287                         }
3288                 }
3289                 if (i == vnnmap->size) {
3290                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3291                                   nodemap->nodes[j].pnn));
3292                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3293                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3294                         return;
3295                 }
3296         }
3297
3298         
3299         /* verify that all other nodes have the same vnnmap
3300            and are from the same generation
3301          */
3302         for (j=0; j<nodemap->num; j++) {
3303                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3304                         continue;
3305                 }
3306                 if (nodemap->nodes[j].pnn == pnn) {
3307                         continue;
3308                 }
3309
3310                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3311                                           mem_ctx, &remote_vnnmap);
3312                 if (ret != 0) {
3313                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3314                                   nodemap->nodes[j].pnn));
3315                         return;
3316                 }
3317
3318                 /* verify the vnnmap generation is the same */
3319                 if (vnnmap->generation != remote_vnnmap->generation) {
3320                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3321                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3322                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3323                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3324                         return;
3325                 }
3326
3327                 /* verify the vnnmap size is the same */
3328                 if (vnnmap->size != remote_vnnmap->size) {
3329                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3330                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3331                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3332                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3333                         return;
3334                 }
3335
3336                 /* verify the vnnmap is the same */
3337                 for (i=0;i<vnnmap->size;i++) {
3338                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3339                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3340                                           nodemap->nodes[j].pnn));
3341                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3342                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3343                                             vnnmap);
3344                                 return;
3345                         }
3346                 }
3347         }
3348
3349         /* we might need to change who has what IP assigned */
3350         if (rec->need_takeover_run) {
3351                 uint32_t culprit = (uint32_t)-1;
3352
3353                 rec->need_takeover_run = false;
3354
3355                 /* update the list of public ips that a node can handle for
3356                    all connected nodes
3357                 */
3358                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3359                 if (ret != 0) {
3360                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3361                                          culprit));
3362                         ctdb_set_culprit(rec, culprit);
3363                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3364                         return;
3365                 }
3366
3367                 /* execute the "startrecovery" event script on all nodes */
3368                 ret = run_startrecovery_eventscript(rec, nodemap);
3369                 if (ret!=0) {
3370                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3371                         ctdb_set_culprit(rec, ctdb->pnn);
3372                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3373                         return;
3374                 }
3375
3376                 ret = ctdb_takeover_run(ctdb, nodemap);
3377                 if (ret != 0) {
3378                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3379                         ctdb_set_culprit(rec, ctdb->pnn);
3380                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3381                         return;
3382                 }
3383
3384                 /* execute the "recovered" event script on all nodes */
3385                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3386 #if 0
3387 // we cant check whether the event completed successfully
3388 // since this script WILL fail if the node is in recovery mode
3389 // and if that race happens, the code here would just cause a second
3390 // cascading recovery.
3391                 if (ret!=0) {
3392                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3393                         ctdb_set_culprit(rec, ctdb->pnn);
3394                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3395                 }
3396 #endif
3397         }
3398 }
3399
3400 /*
3401   the main monitoring loop
3402  */
3403 static void monitor_cluster(struct ctdb_context *ctdb)
3404 {
3405         struct ctdb_recoverd *rec;
3406
3407         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3408
3409         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3410         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3411
3412         rec->ctdb = ctdb;
3413
3414         rec->priority_time = timeval_current();
3415
3416         /* register a message port for sending memory dumps */
3417         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3418
3419         /* register a message port for recovery elections */
3420         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3421
3422         /* when nodes are disabled/enabled */
3423         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3424
3425         /* when we are asked to puch out a flag change */
3426         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3427
3428         /* register a message port for vacuum fetch */
3429         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3430
3431         /* register a message port for reloadnodes  */
3432         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3433
3434         /* register a message port for performing a takeover run */
3435         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3436
3437         /* register a message port for disabling the ip check for a short while */
3438         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3439
3440         /* register a message port for updating the recovery daemons node assignment for an ip */
3441         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3442
3443         for (;;) {
3444                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3445                 if (!mem_ctx) {
3446                         DEBUG(DEBUG_CRIT,(__location__
3447                                           " Failed to create temp context\n"));
3448                         exit(-1);
3449                 }
3450
3451                 main_loop(ctdb, rec, mem_ctx);
3452                 talloc_free(mem_ctx);
3453
3454                 /* we only check for recovery once every second */
3455                 ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
3456         }
3457 }
3458
3459 /*
3460   event handler for when the main ctdbd dies
3461  */
3462 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3463                                  uint16_t flags, void *private_data)
3464 {
3465         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3466         _exit(1);
3467 }
3468
3469 /*
3470   called regularly to verify that the recovery daemon is still running
3471  */
3472 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3473                               struct timeval yt, void *p)
3474 {
3475         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3476
3477         if (kill(ctdb->recoverd_pid, 0) != 0) {
3478                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3479
3480                 ctdb_stop_recoverd(ctdb);
3481                 ctdb_stop_keepalive(ctdb);
3482                 ctdb_stop_monitoring(ctdb);
3483                 ctdb_release_all_ips(ctdb);
3484                 if (ctdb->methods != NULL) {
3485                         ctdb->methods->shutdown(ctdb);
3486                 }
3487                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3488
3489                 exit(10);       
3490         }
3491
3492         event_add_timed(ctdb->ev, ctdb, 
3493                         timeval_current_ofs(30, 0),
3494                         ctdb_check_recd, ctdb);
3495 }
3496
3497 static void recd_sig_child_handler(struct event_context *ev,
3498         struct signal_event *se, int signum, int count,
3499         void *dont_care, 
3500         void *private_data)
3501 {
3502 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3503         int status;
3504         pid_t pid = -1;
3505
3506         while (pid != 0) {
3507                 pid = waitpid(-1, &status, WNOHANG);
3508                 if (pid == -1) {
3509                         if (errno != ECHILD) {
3510                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3511                         }
3512                         return;
3513                 }
3514                 if (pid > 0) {
3515                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3516                 }
3517         }
3518 }
3519
3520 /*
3521   startup the recovery daemon as a child of the main ctdb daemon
3522  */
3523 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3524 {
3525         int fd[2];
3526         struct signal_event *se;
3527
3528         if (pipe(fd) != 0) {
3529                 return -1;
3530         }
3531
3532         ctdb->ctdbd_pid = getpid();
3533
3534         ctdb->recoverd_pid = fork();
3535         if (ctdb->recoverd_pid == -1) {
3536                 return -1;
3537         }
3538         
3539         if (ctdb->recoverd_pid != 0) {
3540                 close(fd[0]);
3541                 event_add_timed(ctdb->ev, ctdb, 
3542                                 timeval_current_ofs(30, 0),
3543                                 ctdb_check_recd, ctdb);
3544                 return 0;
3545         }
3546
3547         close(fd[1]);
3548
3549         srandom(getpid() ^ time(NULL));
3550
3551         if (switch_from_server_to_client(ctdb) != 0) {
3552                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3553                 exit(1);
3554         }
3555
3556         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3557
3558         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3559                      ctdb_recoverd_parent, &fd[0]);     
3560
3561         /* set up a handler to pick up sigchld */
3562         se = event_add_signal(ctdb->ev, ctdb,
3563                                      SIGCHLD, 0,
3564                                      recd_sig_child_handler,
3565                                      ctdb);
3566         if (se == NULL) {
3567                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3568                 exit(1);
3569         }
3570
3571         monitor_cluster(ctdb);
3572
3573         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3574         return -1;
3575 }
3576
3577 /*
3578   shutdown the recovery daemon
3579  */
3580 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3581 {
3582         if (ctdb->recoverd_pid == 0) {
3583                 return;
3584         }
3585
3586         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3587         kill(ctdb->recoverd_pid, SIGTERM);
3588 }