recoverd: No need to check if node is recovery master when inactive
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         if (!ctdb_validate_pnn(ctdb, pnn)) {
84                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
85                 return;
86         }
87
88         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         /* If we are banned or stopped, do not set other nodes as culprits */
145         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
146                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
147                 return;
148         }
149
150         if (ctdb->nodes[culprit]->ban_state == NULL) {
151                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
152                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
153
154                 
155         }
156         ban_state = ctdb->nodes[culprit]->ban_state;
157         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
158                 /* this was the first time in a long while this node
159                    misbehaved so we will forgive any old transgressions.
160                 */
161                 ban_state->count = 0;
162         }
163
164         ban_state->count += count;
165         ban_state->last_reported_time = timeval_current();
166         rec->last_culprit_node = culprit;
167 }
168
169 /*
170   remember the trouble maker
171  */
172 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
173 {
174         ctdb_set_culprit_count(rec, culprit, 1);
175 }
176
177
178 /* this callback is called for every node that failed to execute the
179    start recovery event
180 */
181 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
182 {
183         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
184
185         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
186
187         ctdb_set_culprit(rec, node_pnn);
188 }
189
190 /*
191   run the "startrecovery" eventscript on all nodes
192  */
193 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
194 {
195         TALLOC_CTX *tmp_ctx;
196         uint32_t *nodes;
197         struct ctdb_context *ctdb = rec->ctdb;
198
199         tmp_ctx = talloc_new(ctdb);
200         CTDB_NO_MEMORY(ctdb, tmp_ctx);
201
202         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
204                                         nodes, 0,
205                                         CONTROL_TIMEOUT(), false, tdb_null,
206                                         NULL,
207                                         startrecovery_fail_callback,
208                                         rec) != 0) {
209                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
210                 talloc_free(tmp_ctx);
211                 return -1;
212         }
213
214         talloc_free(tmp_ctx);
215         return 0;
216 }
217
218 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
219 {
220         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
221                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
222                 return;
223         }
224         if (node_pnn < ctdb->num_nodes) {
225                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
226         }
227
228         if (node_pnn == ctdb->pnn) {
229                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
230         }
231 }
232
233 /*
234   update the node capabilities for all connected nodes
235  */
236 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
237 {
238         uint32_t *nodes;
239         TALLOC_CTX *tmp_ctx;
240
241         tmp_ctx = talloc_new(ctdb);
242         CTDB_NO_MEMORY(ctdb, tmp_ctx);
243
244         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
245         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
246                                         nodes, 0,
247                                         CONTROL_TIMEOUT(),
248                                         false, tdb_null,
249                                         async_getcap_callback, NULL,
250                                         NULL) != 0) {
251                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
252                 talloc_free(tmp_ctx);
253                 return -1;
254         }
255
256         talloc_free(tmp_ctx);
257         return 0;
258 }
259
260 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
261 {
262         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
263
264         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
265         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
266 }
267
268 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
269 {
270         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
271
272         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
273         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
274 }
275
276 /*
277   change recovery mode on all nodes
278  */
279 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
280 {
281         TDB_DATA data;
282         uint32_t *nodes;
283         TALLOC_CTX *tmp_ctx;
284
285         tmp_ctx = talloc_new(ctdb);
286         CTDB_NO_MEMORY(ctdb, tmp_ctx);
287
288         /* freeze all nodes */
289         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
290         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
291                 int i;
292
293                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
294                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
295                                                 nodes, i,
296                                                 CONTROL_TIMEOUT(),
297                                                 false, tdb_null,
298                                                 NULL,
299                                                 set_recmode_fail_callback,
300                                                 rec) != 0) {
301                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
302                                 talloc_free(tmp_ctx);
303                                 return -1;
304                         }
305                 }
306         }
307
308
309         data.dsize = sizeof(uint32_t);
310         data.dptr = (unsigned char *)&rec_mode;
311
312         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
313                                         nodes, 0,
314                                         CONTROL_TIMEOUT(),
315                                         false, data,
316                                         NULL, NULL,
317                                         NULL) != 0) {
318                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
319                 talloc_free(tmp_ctx);
320                 return -1;
321         }
322
323         talloc_free(tmp_ctx);
324         return 0;
325 }
326
327 /*
328   change recovery master on all node
329  */
330 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
331 {
332         TDB_DATA data;
333         TALLOC_CTX *tmp_ctx;
334         uint32_t *nodes;
335
336         tmp_ctx = talloc_new(ctdb);
337         CTDB_NO_MEMORY(ctdb, tmp_ctx);
338
339         data.dsize = sizeof(uint32_t);
340         data.dptr = (unsigned char *)&pnn;
341
342         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
343         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
344                                         nodes, 0,
345                                         CONTROL_TIMEOUT(), false, data,
346                                         NULL, NULL,
347                                         NULL) != 0) {
348                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
349                 talloc_free(tmp_ctx);
350                 return -1;
351         }
352
353         talloc_free(tmp_ctx);
354         return 0;
355 }
356
357 /* update all remote nodes to use the same db priority that we have
358    this can fail if the remove node has not yet been upgraded to 
359    support this function, so we always return success and never fail
360    a recovery if this call fails.
361 */
362 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
363         struct ctdb_node_map *nodemap, 
364         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
365 {
366         int db;
367         uint32_t *nodes;
368
369         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
370
371         /* step through all local databases */
372         for (db=0; db<dbmap->num;db++) {
373                 TDB_DATA data;
374                 struct ctdb_db_priority db_prio;
375                 int ret;
376
377                 db_prio.db_id     = dbmap->dbs[db].dbid;
378                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
379                 if (ret != 0) {
380                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
381                         continue;
382                 }
383
384                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
385
386                 data.dptr  = (uint8_t *)&db_prio;
387                 data.dsize = sizeof(db_prio);
388
389                 if (ctdb_client_async_control(ctdb,
390                                         CTDB_CONTROL_SET_DB_PRIORITY,
391                                         nodes, 0,
392                                         CONTROL_TIMEOUT(), false, data,
393                                         NULL, NULL,
394                                         NULL) != 0) {
395                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
396                 }
397         }
398
399         return 0;
400 }                       
401
402 /*
403   ensure all other nodes have attached to any databases that we have
404  */
405 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
406                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
407 {
408         int i, j, db, ret;
409         struct ctdb_dbid_map *remote_dbmap;
410
411         /* verify that all other nodes have all our databases */
412         for (j=0; j<nodemap->num; j++) {
413                 /* we dont need to ourself ourselves */
414                 if (nodemap->nodes[j].pnn == pnn) {
415                         continue;
416                 }
417                 /* dont check nodes that are unavailable */
418                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
419                         continue;
420                 }
421
422                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
423                                          mem_ctx, &remote_dbmap);
424                 if (ret != 0) {
425                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
426                         return -1;
427                 }
428
429                 /* step through all local databases */
430                 for (db=0; db<dbmap->num;db++) {
431                         const char *name;
432
433
434                         for (i=0;i<remote_dbmap->num;i++) {
435                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
436                                         break;
437                                 }
438                         }
439                         /* the remote node already have this database */
440                         if (i!=remote_dbmap->num) {
441                                 continue;
442                         }
443                         /* ok so we need to create this database */
444                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
445                                             mem_ctx, &name);
446                         if (ret != 0) {
447                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
448                                 return -1;
449                         }
450                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
451                                            mem_ctx, name, dbmap->dbs[db].persistent);
452                         if (ret != 0) {
453                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
454                                 return -1;
455                         }
456                 }
457         }
458
459         return 0;
460 }
461
462
463 /*
464   ensure we are attached to any databases that anyone else is attached to
465  */
466 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
467                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
468 {
469         int i, j, db, ret;
470         struct ctdb_dbid_map *remote_dbmap;
471
472         /* verify that we have all database any other node has */
473         for (j=0; j<nodemap->num; j++) {
474                 /* we dont need to ourself ourselves */
475                 if (nodemap->nodes[j].pnn == pnn) {
476                         continue;
477                 }
478                 /* dont check nodes that are unavailable */
479                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
480                         continue;
481                 }
482
483                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
484                                          mem_ctx, &remote_dbmap);
485                 if (ret != 0) {
486                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
487                         return -1;
488                 }
489
490                 /* step through all databases on the remote node */
491                 for (db=0; db<remote_dbmap->num;db++) {
492                         const char *name;
493
494                         for (i=0;i<(*dbmap)->num;i++) {
495                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
496                                         break;
497                                 }
498                         }
499                         /* we already have this db locally */
500                         if (i!=(*dbmap)->num) {
501                                 continue;
502                         }
503                         /* ok so we need to create this database and
504                            rebuild dbmap
505                          */
506                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
507                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
508                         if (ret != 0) {
509                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
510                                           nodemap->nodes[j].pnn));
511                                 return -1;
512                         }
513                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
514                                            remote_dbmap->dbs[db].persistent);
515                         if (ret != 0) {
516                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
517                                 return -1;
518                         }
519                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
520                         if (ret != 0) {
521                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
522                                 return -1;
523                         }
524                 }
525         }
526
527         return 0;
528 }
529
530
531 /*
532   pull the remote database contents from one node into the recdb
533  */
534 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
535                                     struct tdb_wrap *recdb, uint32_t dbid,
536                                     bool persistent)
537 {
538         int ret;
539         TDB_DATA outdata;
540         struct ctdb_marshall_buffer *reply;
541         struct ctdb_rec_data *rec;
542         int i;
543         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
544
545         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
546                                CONTROL_TIMEOUT(), &outdata);
547         if (ret != 0) {
548                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552
553         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
554
555         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
556                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
557                 talloc_free(tmp_ctx);
558                 return -1;
559         }
560         
561         rec = (struct ctdb_rec_data *)&reply->data[0];
562         
563         for (i=0;
564              i<reply->count;
565              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
566                 TDB_DATA key, data;
567                 struct ctdb_ltdb_header *hdr;
568                 TDB_DATA existing;
569                 
570                 key.dptr = &rec->data[0];
571                 key.dsize = rec->keylen;
572                 data.dptr = &rec->data[key.dsize];
573                 data.dsize = rec->datalen;
574                 
575                 hdr = (struct ctdb_ltdb_header *)data.dptr;
576
577                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
578                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
579                         talloc_free(tmp_ctx);
580                         return -1;
581                 }
582
583                 /* fetch the existing record, if any */
584                 existing = tdb_fetch(recdb->tdb, key);
585                 
586                 if (existing.dptr != NULL) {
587                         struct ctdb_ltdb_header header;
588                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
589                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
590                                          (unsigned)existing.dsize, srcnode));
591                                 free(existing.dptr);
592                                 talloc_free(tmp_ctx);
593                                 return -1;
594                         }
595                         header = *(struct ctdb_ltdb_header *)existing.dptr;
596                         free(existing.dptr);
597                         if (!(header.rsn < hdr->rsn ||
598                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
599                                 continue;
600                         }
601                 }
602                 
603                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
604                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
605                         talloc_free(tmp_ctx);
606                         return -1;                              
607                 }
608         }
609
610         talloc_free(tmp_ctx);
611
612         return 0;
613 }
614
615 /*
616   pull all the remote database contents into the recdb
617  */
618 static int pull_remote_database(struct ctdb_context *ctdb,
619                                 struct ctdb_recoverd *rec, 
620                                 struct ctdb_node_map *nodemap, 
621                                 struct tdb_wrap *recdb, uint32_t dbid,
622                                 bool persistent)
623 {
624         int j;
625
626         /* pull all records from all other nodes across onto this node
627            (this merges based on rsn)
628         */
629         for (j=0; j<nodemap->num; j++) {
630                 /* dont merge from nodes that are unavailable */
631                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
632                         continue;
633                 }
634                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
635                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
636                                  nodemap->nodes[j].pnn));
637                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
638                         return -1;
639                 }
640         }
641         
642         return 0;
643 }
644
645
646 /*
647   update flags on all active nodes
648  */
649 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
650 {
651         int ret;
652
653         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
654                 if (ret != 0) {
655                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
656                 return -1;
657         }
658
659         return 0;
660 }
661
662 /*
663   ensure all nodes have the same vnnmap we do
664  */
665 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
666                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
667 {
668         int j, ret;
669
670         /* push the new vnn map out to all the nodes */
671         for (j=0; j<nodemap->num; j++) {
672                 /* dont push to nodes that are unavailable */
673                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
674                         continue;
675                 }
676
677                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
678                 if (ret != 0) {
679                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
680                         return -1;
681                 }
682         }
683
684         return 0;
685 }
686
687
688 struct vacuum_info {
689         struct vacuum_info *next, *prev;
690         struct ctdb_recoverd *rec;
691         uint32_t srcnode;
692         struct ctdb_db_context *ctdb_db;
693         struct ctdb_marshall_buffer *recs;
694         struct ctdb_rec_data *r;
695 };
696
697 static void vacuum_fetch_next(struct vacuum_info *v);
698
699 /*
700   called when a vacuum fetch has completed - just free it and do the next one
701  */
702 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
703 {
704         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
705         talloc_free(state);
706         vacuum_fetch_next(v);
707 }
708
709
710 /*
711   process the next element from the vacuum list
712 */
713 static void vacuum_fetch_next(struct vacuum_info *v)
714 {
715         struct ctdb_call call;
716         struct ctdb_rec_data *r;
717
718         while (v->recs->count) {
719                 struct ctdb_client_call_state *state;
720                 TDB_DATA data;
721                 struct ctdb_ltdb_header *hdr;
722
723                 ZERO_STRUCT(call);
724                 call.call_id = CTDB_NULL_FUNC;
725                 call.flags = CTDB_IMMEDIATE_MIGRATION;
726                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
727
728                 r = v->r;
729                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
730                 v->recs->count--;
731
732                 call.key.dptr = &r->data[0];
733                 call.key.dsize = r->keylen;
734
735                 /* ensure we don't block this daemon - just skip a record if we can't get
736                    the chainlock */
737                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
738                         continue;
739                 }
740
741                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
742                 if (data.dptr == NULL) {
743                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
744                         continue;
745                 }
746
747                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
748                         free(data.dptr);
749                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
750                         continue;
751                 }
752                 
753                 hdr = (struct ctdb_ltdb_header *)data.dptr;
754                 if (hdr->dmaster == v->rec->ctdb->pnn) {
755                         /* its already local */
756                         free(data.dptr);
757                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
758                         continue;
759                 }
760
761                 free(data.dptr);
762
763                 state = ctdb_call_send(v->ctdb_db, &call);
764                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
765                 if (state == NULL) {
766                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
767                         talloc_free(v);
768                         return;
769                 }
770                 state->async.fn = vacuum_fetch_callback;
771                 state->async.private_data = v;
772                 return;
773         }
774
775         talloc_free(v);
776 }
777
778
779 /*
780   destroy a vacuum info structure
781  */
782 static int vacuum_info_destructor(struct vacuum_info *v)
783 {
784         DLIST_REMOVE(v->rec->vacuum_info, v);
785         return 0;
786 }
787
788
789 /*
790   handler for vacuum fetch
791 */
792 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
793                                  TDB_DATA data, void *private_data)
794 {
795         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
796         struct ctdb_marshall_buffer *recs;
797         int ret, i;
798         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
799         const char *name;
800         struct ctdb_dbid_map *dbmap=NULL;
801         bool persistent = false;
802         struct ctdb_db_context *ctdb_db;
803         struct ctdb_rec_data *r;
804         uint32_t srcnode;
805         struct vacuum_info *v;
806
807         recs = (struct ctdb_marshall_buffer *)data.dptr;
808         r = (struct ctdb_rec_data *)&recs->data[0];
809
810         if (recs->count == 0) {
811                 talloc_free(tmp_ctx);
812                 return;
813         }
814
815         srcnode = r->reqid;
816
817         for (v=rec->vacuum_info;v;v=v->next) {
818                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
819                         /* we're already working on records from this node */
820                         talloc_free(tmp_ctx);
821                         return;
822                 }
823         }
824
825         /* work out if the database is persistent */
826         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
827         if (ret != 0) {
828                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
829                 talloc_free(tmp_ctx);
830                 return;
831         }
832
833         for (i=0;i<dbmap->num;i++) {
834                 if (dbmap->dbs[i].dbid == recs->db_id) {
835                         persistent = dbmap->dbs[i].persistent;
836                         break;
837                 }
838         }
839         if (i == dbmap->num) {
840                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
841                 talloc_free(tmp_ctx);
842                 return;         
843         }
844
845         /* find the name of this database */
846         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
847                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
848                 talloc_free(tmp_ctx);
849                 return;
850         }
851
852         /* attach to it */
853         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
854         if (ctdb_db == NULL) {
855                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
856                 talloc_free(tmp_ctx);
857                 return;
858         }
859
860         v = talloc_zero(rec, struct vacuum_info);
861         if (v == NULL) {
862                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
863                 talloc_free(tmp_ctx);
864                 return;
865         }
866
867         v->rec = rec;
868         v->srcnode = srcnode;
869         v->ctdb_db = ctdb_db;
870         v->recs = talloc_memdup(v, recs, data.dsize);
871         if (v->recs == NULL) {
872                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
873                 talloc_free(v);
874                 talloc_free(tmp_ctx);
875                 return;         
876         }
877         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
878
879         DLIST_ADD(rec->vacuum_info, v);
880
881         talloc_set_destructor(v, vacuum_info_destructor);
882
883         vacuum_fetch_next(v);
884         talloc_free(tmp_ctx);
885 }
886
887
888 /*
889   called when ctdb_wait_timeout should finish
890  */
891 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
892                               struct timeval yt, void *p)
893 {
894         uint32_t *timed_out = (uint32_t *)p;
895         (*timed_out) = 1;
896 }
897
898 /*
899   wait for a given number of seconds
900  */
901 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
902 {
903         uint32_t timed_out = 0;
904         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
905         while (!timed_out) {
906                 event_loop_once(ctdb->ev);
907         }
908 }
909
910 /*
911   called when an election times out (ends)
912  */
913 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
914                                   struct timeval t, void *p)
915 {
916         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
917         rec->election_timeout = NULL;
918
919         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
920 }
921
922
923 /*
924   wait for an election to finish. It finished election_timeout seconds after
925   the last election packet is received
926  */
927 static void ctdb_wait_election(struct ctdb_recoverd *rec)
928 {
929         struct ctdb_context *ctdb = rec->ctdb;
930         while (rec->election_timeout) {
931                 event_loop_once(ctdb->ev);
932         }
933 }
934
935 /*
936   Update our local flags from all remote connected nodes. 
937   This is only run when we are or we belive we are the recovery master
938  */
939 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
940 {
941         int j;
942         struct ctdb_context *ctdb = rec->ctdb;
943         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
944
945         /* get the nodemap for all active remote nodes and verify
946            they are the same as for this node
947          */
948         for (j=0; j<nodemap->num; j++) {
949                 struct ctdb_node_map *remote_nodemap=NULL;
950                 int ret;
951
952                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
953                         continue;
954                 }
955                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
956                         continue;
957                 }
958
959                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
960                                            mem_ctx, &remote_nodemap);
961                 if (ret != 0) {
962                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
963                                   nodemap->nodes[j].pnn));
964                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
965                         talloc_free(mem_ctx);
966                         return MONITOR_FAILED;
967                 }
968                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
969                         /* We should tell our daemon about this so it
970                            updates its flags or else we will log the same 
971                            message again in the next iteration of recovery.
972                            Since we are the recovery master we can just as
973                            well update the flags on all nodes.
974                         */
975                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
976                         if (ret != 0) {
977                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
978                                 return -1;
979                         }
980
981                         /* Update our local copy of the flags in the recovery
982                            daemon.
983                         */
984                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
985                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
986                                  nodemap->nodes[j].flags));
987                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
988                 }
989                 talloc_free(remote_nodemap);
990         }
991         talloc_free(mem_ctx);
992         return MONITOR_OK;
993 }
994
995
996 /* Create a new random generation ip. 
997    The generation id can not be the INVALID_GENERATION id
998 */
999 static uint32_t new_generation(void)
1000 {
1001         uint32_t generation;
1002
1003         while (1) {
1004                 generation = random();
1005
1006                 if (generation != INVALID_GENERATION) {
1007                         break;
1008                 }
1009         }
1010
1011         return generation;
1012 }
1013
1014
1015 /*
1016   create a temporary working database
1017  */
1018 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1019 {
1020         char *name;
1021         struct tdb_wrap *recdb;
1022         unsigned tdb_flags;
1023
1024         /* open up the temporary recovery database */
1025         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1026                                ctdb->db_directory_state,
1027                                ctdb->pnn);
1028         if (name == NULL) {
1029                 return NULL;
1030         }
1031         unlink(name);
1032
1033         tdb_flags = TDB_NOLOCK;
1034         if (ctdb->valgrinding) {
1035                 tdb_flags |= TDB_NOMMAP;
1036         }
1037         tdb_flags |= TDB_DISALLOW_NESTING;
1038
1039         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1040                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1041         if (recdb == NULL) {
1042                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1043         }
1044
1045         talloc_free(name);
1046
1047         return recdb;
1048 }
1049
1050
1051 /* 
1052    a traverse function for pulling all relevent records from recdb
1053  */
1054 struct recdb_data {
1055         struct ctdb_context *ctdb;
1056         struct ctdb_marshall_buffer *recdata;
1057         uint32_t len;
1058         bool failed;
1059         bool persistent;
1060 };
1061
1062 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1063 {
1064         struct recdb_data *params = (struct recdb_data *)p;
1065         struct ctdb_rec_data *rec;
1066         struct ctdb_ltdb_header *hdr;
1067
1068         /* skip empty records */
1069         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1070                 return 0;
1071         }
1072
1073         /* update the dmaster field to point to us */
1074         hdr = (struct ctdb_ltdb_header *)data.dptr;
1075         if (!params->persistent) {
1076                 hdr->dmaster = params->ctdb->pnn;
1077                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1078         }
1079
1080         /* add the record to the blob ready to send to the nodes */
1081         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1082         if (rec == NULL) {
1083                 params->failed = true;
1084                 return -1;
1085         }
1086         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1087         if (params->recdata == NULL) {
1088                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1089                          rec->length + params->len));
1090                 params->failed = true;
1091                 return -1;
1092         }
1093         params->recdata->count++;
1094         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1095         params->len += rec->length;
1096         talloc_free(rec);
1097
1098         return 0;
1099 }
1100
1101 /*
1102   push the recdb database out to all nodes
1103  */
1104 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1105                                bool persistent,
1106                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1107 {
1108         struct recdb_data params;
1109         struct ctdb_marshall_buffer *recdata;
1110         TDB_DATA outdata;
1111         TALLOC_CTX *tmp_ctx;
1112         uint32_t *nodes;
1113
1114         tmp_ctx = talloc_new(ctdb);
1115         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1116
1117         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1118         CTDB_NO_MEMORY(ctdb, recdata);
1119
1120         recdata->db_id = dbid;
1121
1122         params.ctdb = ctdb;
1123         params.recdata = recdata;
1124         params.len = offsetof(struct ctdb_marshall_buffer, data);
1125         params.failed = false;
1126         params.persistent = persistent;
1127
1128         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1129                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1130                 talloc_free(params.recdata);
1131                 talloc_free(tmp_ctx);
1132                 return -1;
1133         }
1134
1135         if (params.failed) {
1136                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1137                 talloc_free(params.recdata);
1138                 talloc_free(tmp_ctx);
1139                 return -1;              
1140         }
1141
1142         recdata = params.recdata;
1143
1144         outdata.dptr = (void *)recdata;
1145         outdata.dsize = params.len;
1146
1147         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1148         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1149                                         nodes, 0,
1150                                         CONTROL_TIMEOUT(), false, outdata,
1151                                         NULL, NULL,
1152                                         NULL) != 0) {
1153                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1154                 talloc_free(recdata);
1155                 talloc_free(tmp_ctx);
1156                 return -1;
1157         }
1158
1159         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1160                   dbid, recdata->count));
1161
1162         talloc_free(recdata);
1163         talloc_free(tmp_ctx);
1164
1165         return 0;
1166 }
1167
1168
1169 /*
1170   go through a full recovery on one database 
1171  */
1172 static int recover_database(struct ctdb_recoverd *rec, 
1173                             TALLOC_CTX *mem_ctx,
1174                             uint32_t dbid,
1175                             bool persistent,
1176                             uint32_t pnn, 
1177                             struct ctdb_node_map *nodemap,
1178                             uint32_t transaction_id)
1179 {
1180         struct tdb_wrap *recdb;
1181         int ret;
1182         struct ctdb_context *ctdb = rec->ctdb;
1183         TDB_DATA data;
1184         struct ctdb_control_wipe_database w;
1185         uint32_t *nodes;
1186
1187         recdb = create_recdb(ctdb, mem_ctx);
1188         if (recdb == NULL) {
1189                 return -1;
1190         }
1191
1192         /* pull all remote databases onto the recdb */
1193         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1194         if (ret != 0) {
1195                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1196                 return -1;
1197         }
1198
1199         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1200
1201         /* wipe all the remote databases. This is safe as we are in a transaction */
1202         w.db_id = dbid;
1203         w.transaction_id = transaction_id;
1204
1205         data.dptr = (void *)&w;
1206         data.dsize = sizeof(w);
1207
1208         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1209         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1210                                         nodes, 0,
1211                                         CONTROL_TIMEOUT(), false, data,
1212                                         NULL, NULL,
1213                                         NULL) != 0) {
1214                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1215                 talloc_free(recdb);
1216                 return -1;
1217         }
1218         
1219         /* push out the correct database. This sets the dmaster and skips 
1220            the empty records */
1221         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1222         if (ret != 0) {
1223                 talloc_free(recdb);
1224                 return -1;
1225         }
1226
1227         /* all done with this database */
1228         talloc_free(recdb);
1229
1230         return 0;
1231 }
1232
1233 /*
1234   reload the nodes file 
1235 */
1236 static void reload_nodes_file(struct ctdb_context *ctdb)
1237 {
1238         ctdb->nodes = NULL;
1239         ctdb_load_nodes_file(ctdb);
1240 }
1241
1242 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1243                                          struct ctdb_recoverd *rec,
1244                                          struct ctdb_node_map *nodemap,
1245                                          uint32_t *culprit)
1246 {
1247         int j;
1248         int ret;
1249
1250         if (ctdb->num_nodes != nodemap->num) {
1251                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1252                                   ctdb->num_nodes, nodemap->num));
1253                 if (culprit) {
1254                         *culprit = ctdb->pnn;
1255                 }
1256                 return -1;
1257         }
1258
1259         for (j=0; j<nodemap->num; j++) {
1260                 /* release any existing data */
1261                 if (ctdb->nodes[j]->known_public_ips) {
1262                         talloc_free(ctdb->nodes[j]->known_public_ips);
1263                         ctdb->nodes[j]->known_public_ips = NULL;
1264                 }
1265                 if (ctdb->nodes[j]->available_public_ips) {
1266                         talloc_free(ctdb->nodes[j]->available_public_ips);
1267                         ctdb->nodes[j]->available_public_ips = NULL;
1268                 }
1269
1270                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1271                         continue;
1272                 }
1273
1274                 /* grab a new shiny list of public ips from the node */
1275                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1276                                         CONTROL_TIMEOUT(),
1277                                         ctdb->nodes[j]->pnn,
1278                                         ctdb->nodes,
1279                                         0,
1280                                         &ctdb->nodes[j]->known_public_ips);
1281                 if (ret != 0) {
1282                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1283                                 ctdb->nodes[j]->pnn));
1284                         if (culprit) {
1285                                 *culprit = ctdb->nodes[j]->pnn;
1286                         }
1287                         return -1;
1288                 }
1289
1290                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1291                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1292                         rec->need_takeover_run = true;
1293                 }
1294
1295                 /* grab a new shiny list of public ips from the node */
1296                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1297                                         CONTROL_TIMEOUT(),
1298                                         ctdb->nodes[j]->pnn,
1299                                         ctdb->nodes,
1300                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1301                                         &ctdb->nodes[j]->available_public_ips);
1302                 if (ret != 0) {
1303                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1304                                 ctdb->nodes[j]->pnn));
1305                         if (culprit) {
1306                                 *culprit = ctdb->nodes[j]->pnn;
1307                         }
1308                         return -1;
1309                 }
1310         }
1311
1312         return 0;
1313 }
1314
1315 /*
1316   we are the recmaster, and recovery is needed - start a recovery run
1317  */
1318 static int do_recovery(struct ctdb_recoverd *rec, 
1319                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1320                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1321 {
1322         struct ctdb_context *ctdb = rec->ctdb;
1323         int i, j, ret;
1324         uint32_t generation;
1325         struct ctdb_dbid_map *dbmap;
1326         TDB_DATA data;
1327         uint32_t *nodes;
1328         struct timeval start_time;
1329         uint32_t culprit = (uint32_t)-1;
1330
1331         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1332
1333         /* if recovery fails, force it again */
1334         rec->need_recovery = true;
1335
1336         for (i=0; i<ctdb->num_nodes; i++) {
1337                 struct ctdb_banning_state *ban_state;
1338
1339                 if (ctdb->nodes[i]->ban_state == NULL) {
1340                         continue;
1341                 }
1342                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1343                 if (ban_state->count < 2*ctdb->num_nodes) {
1344                         continue;
1345                 }
1346                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1347                         ctdb->nodes[i]->pnn, ban_state->count,
1348                         ctdb->tunable.recovery_ban_period));
1349                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1350                 ban_state->count = 0;
1351         }
1352
1353
1354         if (ctdb->tunable.verify_recovery_lock != 0) {
1355                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1356                 start_time = timeval_current();
1357                 if (!ctdb_recovery_lock(ctdb, true)) {
1358                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1359                                          "and ban ourself for %u seconds\n",
1360                                          ctdb->tunable.recovery_ban_period));
1361                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1362                         return -1;
1363                 }
1364                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1365                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1366         }
1367
1368         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1369
1370         /* get a list of all databases */
1371         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1372         if (ret != 0) {
1373                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1374                 return -1;
1375         }
1376
1377         /* we do the db creation before we set the recovery mode, so the freeze happens
1378            on all databases we will be dealing with. */
1379
1380         /* verify that we have all the databases any other node has */
1381         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1382         if (ret != 0) {
1383                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1384                 return -1;
1385         }
1386
1387         /* verify that all other nodes have all our databases */
1388         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1389         if (ret != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1391                 return -1;
1392         }
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1394
1395         /* update the database priority for all remote databases */
1396         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1397         if (ret != 0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1399         }
1400         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1401
1402
1403         /* set recovery mode to active on all nodes */
1404         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1407                 return -1;
1408         }
1409
1410         /* execute the "startrecovery" event script on all nodes */
1411         ret = run_startrecovery_eventscript(rec, nodemap);
1412         if (ret!=0) {
1413                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1414                 return -1;
1415         }
1416
1417         /*
1418           update all nodes to have the same flags that we have
1419          */
1420         for (i=0;i<nodemap->num;i++) {
1421                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1422                         continue;
1423                 }
1424
1425                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1426                 if (ret != 0) {
1427                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1428                         return -1;
1429                 }
1430         }
1431
1432         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1433
1434         /* pick a new generation number */
1435         generation = new_generation();
1436
1437         /* change the vnnmap on this node to use the new generation 
1438            number but not on any other nodes.
1439            this guarantees that if we abort the recovery prematurely
1440            for some reason (a node stops responding?)
1441            that we can just return immediately and we will reenter
1442            recovery shortly again.
1443            I.e. we deliberately leave the cluster with an inconsistent
1444            generation id to allow us to abort recovery at any stage and
1445            just restart it from scratch.
1446          */
1447         vnnmap->generation = generation;
1448         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1449         if (ret != 0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1451                 return -1;
1452         }
1453
1454         data.dptr = (void *)&generation;
1455         data.dsize = sizeof(uint32_t);
1456
1457         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1458         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1459                                         nodes, 0,
1460                                         CONTROL_TIMEOUT(), false, data,
1461                                         NULL,
1462                                         transaction_start_fail_callback,
1463                                         rec) != 0) {
1464                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1465                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1466                                         nodes, 0,
1467                                         CONTROL_TIMEOUT(), false, tdb_null,
1468                                         NULL,
1469                                         NULL,
1470                                         NULL) != 0) {
1471                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1472                 }
1473                 return -1;
1474         }
1475
1476         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1477
1478         for (i=0;i<dbmap->num;i++) {
1479                 ret = recover_database(rec, mem_ctx,
1480                                        dbmap->dbs[i].dbid,
1481                                        dbmap->dbs[i].persistent,
1482                                        pnn, nodemap, generation);
1483                 if (ret != 0) {
1484                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1485                         return -1;
1486                 }
1487         }
1488
1489         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1490
1491         /* commit all the changes */
1492         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1493                                         nodes, 0,
1494                                         CONTROL_TIMEOUT(), false, data,
1495                                         NULL, NULL,
1496                                         NULL) != 0) {
1497                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1498                 return -1;
1499         }
1500
1501         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1502         
1503
1504         /* update the capabilities for all nodes */
1505         ret = update_capabilities(ctdb, nodemap);
1506         if (ret!=0) {
1507                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1508                 return -1;
1509         }
1510
1511         /* build a new vnn map with all the currently active and
1512            unbanned nodes */
1513         generation = new_generation();
1514         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1515         CTDB_NO_MEMORY(ctdb, vnnmap);
1516         vnnmap->generation = generation;
1517         vnnmap->size = 0;
1518         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1519         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1520         for (i=j=0;i<nodemap->num;i++) {
1521                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1522                         continue;
1523                 }
1524                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1525                         /* this node can not be an lmaster */
1526                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1527                         continue;
1528                 }
1529
1530                 vnnmap->size++;
1531                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1532                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1533                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1534
1535         }
1536         if (vnnmap->size == 0) {
1537                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1538                 vnnmap->size++;
1539                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1540                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1541                 vnnmap->map[0] = pnn;
1542         }       
1543
1544         /* update to the new vnnmap on all nodes */
1545         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1546         if (ret != 0) {
1547                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1548                 return -1;
1549         }
1550
1551         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1552
1553         /* update recmaster to point to us for all nodes */
1554         ret = set_recovery_master(ctdb, nodemap, pnn);
1555         if (ret!=0) {
1556                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1557                 return -1;
1558         }
1559
1560         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1561
1562         /*
1563           update all nodes to have the same flags that we have
1564          */
1565         for (i=0;i<nodemap->num;i++) {
1566                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1567                         continue;
1568                 }
1569
1570                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1571                 if (ret != 0) {
1572                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1573                         return -1;
1574                 }
1575         }
1576
1577         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1578
1579         /* disable recovery mode */
1580         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1581         if (ret != 0) {
1582                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1583                 return -1;
1584         }
1585
1586         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1587
1588         /*
1589           tell nodes to takeover their public IPs
1590          */
1591         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1592         if (ret != 0) {
1593                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1594                                  culprit));
1595                 return -1;
1596         }
1597         rec->need_takeover_run = false;
1598         ret = ctdb_takeover_run(ctdb, nodemap);
1599         if (ret != 0) {
1600                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1601                 return -1;
1602         }
1603         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1604
1605         /* execute the "recovered" event script on all nodes */
1606         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1607         if (ret!=0) {
1608                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1609                 return -1;
1610         }
1611
1612         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1613
1614         /* send a message to all clients telling them that the cluster 
1615            has been reconfigured */
1616         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1617
1618         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1619
1620         rec->need_recovery = false;
1621
1622         /* we managed to complete a full recovery, make sure to forgive
1623            any past sins by the nodes that could now participate in the
1624            recovery.
1625         */
1626         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1627         for (i=0;i<nodemap->num;i++) {
1628                 struct ctdb_banning_state *ban_state;
1629
1630                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1631                         continue;
1632                 }
1633
1634                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1635                 if (ban_state == NULL) {
1636                         continue;
1637                 }
1638
1639                 ban_state->count = 0;
1640         }
1641
1642
1643         /* We just finished a recovery successfully. 
1644            We now wait for rerecovery_timeout before we allow 
1645            another recovery to take place.
1646         */
1647         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1648         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1649         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1650
1651         return 0;
1652 }
1653
1654
1655 /*
1656   elections are won by first checking the number of connected nodes, then
1657   the priority time, then the pnn
1658  */
1659 struct election_message {
1660         uint32_t num_connected;
1661         struct timeval priority_time;
1662         uint32_t pnn;
1663         uint32_t node_flags;
1664 };
1665
1666 /*
1667   form this nodes election data
1668  */
1669 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1670 {
1671         int ret, i;
1672         struct ctdb_node_map *nodemap;
1673         struct ctdb_context *ctdb = rec->ctdb;
1674
1675         ZERO_STRUCTP(em);
1676
1677         em->pnn = rec->ctdb->pnn;
1678         em->priority_time = rec->priority_time;
1679
1680         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1681         if (ret != 0) {
1682                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1683                 return;
1684         }
1685
1686         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1687         em->node_flags = rec->node_flags;
1688
1689         for (i=0;i<nodemap->num;i++) {
1690                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1691                         em->num_connected++;
1692                 }
1693         }
1694
1695         /* we shouldnt try to win this election if we cant be a recmaster */
1696         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1697                 em->num_connected = 0;
1698                 em->priority_time = timeval_current();
1699         }
1700
1701         talloc_free(nodemap);
1702 }
1703
1704 /*
1705   see if the given election data wins
1706  */
1707 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1708 {
1709         struct election_message myem;
1710         int cmp = 0;
1711
1712         ctdb_election_data(rec, &myem);
1713
1714         /* we cant win if we dont have the recmaster capability */
1715         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1716                 return false;
1717         }
1718
1719         /* we cant win if we are banned */
1720         if (rec->node_flags & NODE_FLAGS_BANNED) {
1721                 return false;
1722         }       
1723
1724         /* we cant win if we are stopped */
1725         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1726                 return false;
1727         }       
1728
1729         /* we will automatically win if the other node is banned */
1730         if (em->node_flags & NODE_FLAGS_BANNED) {
1731                 return true;
1732         }
1733
1734         /* we will automatically win if the other node is banned */
1735         if (em->node_flags & NODE_FLAGS_STOPPED) {
1736                 return true;
1737         }
1738
1739         /* try to use the most connected node */
1740         if (cmp == 0) {
1741                 cmp = (int)myem.num_connected - (int)em->num_connected;
1742         }
1743
1744         /* then the longest running node */
1745         if (cmp == 0) {
1746                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1747         }
1748
1749         if (cmp == 0) {
1750                 cmp = (int)myem.pnn - (int)em->pnn;
1751         }
1752
1753         return cmp > 0;
1754 }
1755
1756 /*
1757   send out an election request
1758  */
1759 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1760 {
1761         int ret;
1762         TDB_DATA election_data;
1763         struct election_message emsg;
1764         uint64_t srvid;
1765         struct ctdb_context *ctdb = rec->ctdb;
1766
1767         srvid = CTDB_SRVID_RECOVERY;
1768
1769         ctdb_election_data(rec, &emsg);
1770
1771         election_data.dsize = sizeof(struct election_message);
1772         election_data.dptr  = (unsigned char *)&emsg;
1773
1774
1775         /* send an election message to all active nodes */
1776         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1777         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1778
1779
1780         /* A new node that is already frozen has entered the cluster.
1781            The existing nodes are not frozen and dont need to be frozen
1782            until the election has ended and we start the actual recovery
1783         */
1784         if (update_recmaster == true) {
1785                 /* first we assume we will win the election and set 
1786                    recoverymaster to be ourself on the current node
1787                  */
1788                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1789                 if (ret != 0) {
1790                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1791                         return -1;
1792                 }
1793         }
1794
1795
1796         return 0;
1797 }
1798
1799 /*
1800   this function will unban all nodes in the cluster
1801 */
1802 static void unban_all_nodes(struct ctdb_context *ctdb)
1803 {
1804         int ret, i;
1805         struct ctdb_node_map *nodemap;
1806         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1807         
1808         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1809         if (ret != 0) {
1810                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1811                 return;
1812         }
1813
1814         for (i=0;i<nodemap->num;i++) {
1815                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1816                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1817                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1818                 }
1819         }
1820
1821         talloc_free(tmp_ctx);
1822 }
1823
1824
1825 /*
1826   we think we are winning the election - send a broadcast election request
1827  */
1828 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1829 {
1830         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1831         int ret;
1832
1833         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1834         if (ret != 0) {
1835                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1836         }
1837
1838         talloc_free(rec->send_election_te);
1839         rec->send_election_te = NULL;
1840 }
1841
1842 /*
1843   handler for memory dumps
1844 */
1845 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1846                              TDB_DATA data, void *private_data)
1847 {
1848         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1849         TDB_DATA *dump;
1850         int ret;
1851         struct rd_memdump_reply *rd;
1852
1853         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1854                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1855                 talloc_free(tmp_ctx);
1856                 return;
1857         }
1858         rd = (struct rd_memdump_reply *)data.dptr;
1859
1860         dump = talloc_zero(tmp_ctx, TDB_DATA);
1861         if (dump == NULL) {
1862                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1863                 talloc_free(tmp_ctx);
1864                 return;
1865         }
1866         ret = ctdb_dump_memory(ctdb, dump);
1867         if (ret != 0) {
1868                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1869                 talloc_free(tmp_ctx);
1870                 return;
1871         }
1872
1873 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1874
1875         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1876         if (ret != 0) {
1877                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1878                 talloc_free(tmp_ctx);
1879                 return;
1880         }
1881
1882         talloc_free(tmp_ctx);
1883 }
1884
1885 /*
1886   handler for reload_nodes
1887 */
1888 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1889                              TDB_DATA data, void *private_data)
1890 {
1891         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1892
1893         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1894
1895         reload_nodes_file(rec->ctdb);
1896 }
1897
1898
1899 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1900                               struct timeval yt, void *p)
1901 {
1902         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1903
1904         talloc_free(rec->ip_check_disable_ctx);
1905         rec->ip_check_disable_ctx = NULL;
1906 }
1907
1908
1909 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1910                              TDB_DATA data, void *private_data)
1911 {
1912         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1913         struct ctdb_public_ip *ip;
1914
1915         if (rec->recmaster != rec->ctdb->pnn) {
1916                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1917                 return;
1918         }
1919
1920         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1921                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1922                 return;
1923         }
1924
1925         ip = (struct ctdb_public_ip *)data.dptr;
1926
1927         update_ip_assignment_tree(rec->ctdb, ip);
1928 }
1929
1930
1931 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1932                              TDB_DATA data, void *private_data)
1933 {
1934         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1935         uint32_t timeout;
1936
1937         if (rec->ip_check_disable_ctx != NULL) {
1938                 talloc_free(rec->ip_check_disable_ctx);
1939                 rec->ip_check_disable_ctx = NULL;
1940         }
1941
1942         if (data.dsize != sizeof(uint32_t)) {
1943                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1944                                  "expexting %lu\n", (long unsigned)data.dsize,
1945                                  (long unsigned)sizeof(uint32_t)));
1946                 return;
1947         }
1948         if (data.dptr == NULL) {
1949                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1950                 return;
1951         }
1952
1953         timeout = *((uint32_t *)data.dptr);
1954         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1955
1956         rec->ip_check_disable_ctx = talloc_new(rec);
1957         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1958
1959         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1960 }
1961
1962
1963 /*
1964   handler for ip reallocate, just add it to the list of callers and 
1965   handle this later in the monitor_cluster loop so we do not recurse
1966   with other callers to takeover_run()
1967 */
1968 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1969                              TDB_DATA data, void *private_data)
1970 {
1971         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1972         struct ip_reallocate_list *caller;
1973
1974         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1975                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1976                 return;
1977         }
1978
1979         if (rec->ip_reallocate_ctx == NULL) {
1980                 rec->ip_reallocate_ctx = talloc_new(rec);
1981                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1982         }
1983
1984         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1985         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1986
1987         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1988         caller->next = rec->reallocate_callers;
1989         rec->reallocate_callers = caller;
1990
1991         return;
1992 }
1993
1994 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1995 {
1996         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1997         TDB_DATA result;
1998         int32_t ret;
1999         struct ip_reallocate_list *callers;
2000         uint32_t culprit;
2001
2002         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2003
2004         /* update the list of public ips that a node can handle for
2005            all connected nodes
2006         */
2007         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2008         if (ret != 0) {
2009                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2010                                  culprit));
2011                 rec->need_takeover_run = true;
2012         }
2013         if (ret == 0) {
2014                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2015                 if (ret != 0) {
2016                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2017                                          culprit));
2018                         rec->need_takeover_run = true;
2019                 }
2020         }
2021
2022         result.dsize = sizeof(int32_t);
2023         result.dptr  = (uint8_t *)&ret;
2024
2025         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2026
2027                 /* Someone that sent srvid==0 does not want a reply */
2028                 if (callers->rd->srvid == 0) {
2029                         continue;
2030                 }
2031                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2032                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2033                                   (unsigned long long)callers->rd->srvid));
2034                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2035                 if (ret != 0) {
2036                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2037                                          "message to %u:%llu\n",
2038                                          (unsigned)callers->rd->pnn,
2039                                          (unsigned long long)callers->rd->srvid));
2040                 }
2041         }
2042
2043         talloc_free(tmp_ctx);
2044         talloc_free(rec->ip_reallocate_ctx);
2045         rec->ip_reallocate_ctx = NULL;
2046         rec->reallocate_callers = NULL;
2047         
2048 }
2049
2050
2051 /*
2052   handler for recovery master elections
2053 */
2054 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2055                              TDB_DATA data, void *private_data)
2056 {
2057         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2058         int ret;
2059         struct election_message *em = (struct election_message *)data.dptr;
2060         TALLOC_CTX *mem_ctx;
2061
2062         /* we got an election packet - update the timeout for the election */
2063         talloc_free(rec->election_timeout);
2064         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2065                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2066                                                 ctdb_election_timeout, rec);
2067
2068         mem_ctx = talloc_new(ctdb);
2069
2070         /* someone called an election. check their election data
2071            and if we disagree and we would rather be the elected node, 
2072            send a new election message to all other nodes
2073          */
2074         if (ctdb_election_win(rec, em)) {
2075                 if (!rec->send_election_te) {
2076                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2077                                                                 timeval_current_ofs(0, 500000),
2078                                                                 election_send_request, rec);
2079                 }
2080                 talloc_free(mem_ctx);
2081                 /*unban_all_nodes(ctdb);*/
2082                 return;
2083         }
2084         
2085         /* we didn't win */
2086         talloc_free(rec->send_election_te);
2087         rec->send_election_te = NULL;
2088
2089         if (ctdb->tunable.verify_recovery_lock != 0) {
2090                 /* release the recmaster lock */
2091                 if (em->pnn != ctdb->pnn &&
2092                     ctdb->recovery_lock_fd != -1) {
2093                         close(ctdb->recovery_lock_fd);
2094                         ctdb->recovery_lock_fd = -1;
2095                         unban_all_nodes(ctdb);
2096                 }
2097         }
2098
2099         /* ok, let that guy become recmaster then */
2100         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2101         if (ret != 0) {
2102                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2103                 talloc_free(mem_ctx);
2104                 return;
2105         }
2106
2107         talloc_free(mem_ctx);
2108         return;
2109 }
2110
2111
2112 /*
2113   force the start of the election process
2114  */
2115 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2116                            struct ctdb_node_map *nodemap)
2117 {
2118         int ret;
2119         struct ctdb_context *ctdb = rec->ctdb;
2120
2121         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2122
2123         /* set all nodes to recovery mode to stop all internode traffic */
2124         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2125         if (ret != 0) {
2126                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2127                 return;
2128         }
2129
2130         talloc_free(rec->election_timeout);
2131         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2132                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2133                                                 ctdb_election_timeout, rec);
2134
2135         ret = send_election_request(rec, pnn, true);
2136         if (ret!=0) {
2137                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2138                 return;
2139         }
2140
2141         /* wait for a few seconds to collect all responses */
2142         ctdb_wait_election(rec);
2143 }
2144
2145
2146
2147 /*
2148   handler for when a node changes its flags
2149 */
2150 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2151                             TDB_DATA data, void *private_data)
2152 {
2153         int ret;
2154         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2155         struct ctdb_node_map *nodemap=NULL;
2156         TALLOC_CTX *tmp_ctx;
2157         int i;
2158         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2159         int disabled_flag_changed;
2160
2161         if (data.dsize != sizeof(*c)) {
2162                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2163                 return;
2164         }
2165
2166         tmp_ctx = talloc_new(ctdb);
2167         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2168
2169         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2170         if (ret != 0) {
2171                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2172                 talloc_free(tmp_ctx);
2173                 return;         
2174         }
2175
2176
2177         for (i=0;i<nodemap->num;i++) {
2178                 if (nodemap->nodes[i].pnn == c->pnn) break;
2179         }
2180
2181         if (i == nodemap->num) {
2182                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2183                 talloc_free(tmp_ctx);
2184                 return;
2185         }
2186
2187         if (nodemap->nodes[i].flags != c->new_flags) {
2188                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2189         }
2190
2191         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2192
2193         nodemap->nodes[i].flags = c->new_flags;
2194
2195         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2196                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2197
2198         if (ret == 0) {
2199                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2200                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2201         }
2202         
2203         if (ret == 0 &&
2204             ctdb->recovery_master == ctdb->pnn &&
2205             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2206                 /* Only do the takeover run if the perm disabled or unhealthy
2207                    flags changed since these will cause an ip failover but not
2208                    a recovery.
2209                    If the node became disconnected or banned this will also
2210                    lead to an ip address failover but that is handled 
2211                    during recovery
2212                 */
2213                 if (disabled_flag_changed) {
2214                         rec->need_takeover_run = true;
2215                 }
2216         }
2217
2218         talloc_free(tmp_ctx);
2219 }
2220
2221 /*
2222   handler for when we need to push out flag changes ot all other nodes
2223 */
2224 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2225                             TDB_DATA data, void *private_data)
2226 {
2227         int ret;
2228         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2229         struct ctdb_node_map *nodemap=NULL;
2230         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2231         uint32_t recmaster;
2232         uint32_t *nodes;
2233
2234         /* find the recovery master */
2235         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2236         if (ret != 0) {
2237                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2238                 talloc_free(tmp_ctx);
2239                 return;
2240         }
2241
2242         /* read the node flags from the recmaster */
2243         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2244         if (ret != 0) {
2245                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2246                 talloc_free(tmp_ctx);
2247                 return;
2248         }
2249         if (c->pnn >= nodemap->num) {
2250                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2251                 talloc_free(tmp_ctx);
2252                 return;
2253         }
2254
2255         /* send the flags update to all connected nodes */
2256         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2257
2258         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2259                                       nodes, 0, CONTROL_TIMEOUT(),
2260                                       false, data,
2261                                       NULL, NULL,
2262                                       NULL) != 0) {
2263                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2264
2265                 talloc_free(tmp_ctx);
2266                 return;
2267         }
2268
2269         talloc_free(tmp_ctx);
2270 }
2271
2272
2273 struct verify_recmode_normal_data {
2274         uint32_t count;
2275         enum monitor_result status;
2276 };
2277
2278 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2279 {
2280         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2281
2282
2283         /* one more node has responded with recmode data*/
2284         rmdata->count--;
2285
2286         /* if we failed to get the recmode, then return an error and let
2287            the main loop try again.
2288         */
2289         if (state->state != CTDB_CONTROL_DONE) {
2290                 if (rmdata->status == MONITOR_OK) {
2291                         rmdata->status = MONITOR_FAILED;
2292                 }
2293                 return;
2294         }
2295
2296         /* if we got a response, then the recmode will be stored in the
2297            status field
2298         */
2299         if (state->status != CTDB_RECOVERY_NORMAL) {
2300                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2301                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2302         }
2303
2304         return;
2305 }
2306
2307
2308 /* verify that all nodes are in normal recovery mode */
2309 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2310 {
2311         struct verify_recmode_normal_data *rmdata;
2312         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2313         struct ctdb_client_control_state *state;
2314         enum monitor_result status;
2315         int j;
2316         
2317         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2318         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2319         rmdata->count  = 0;
2320         rmdata->status = MONITOR_OK;
2321
2322         /* loop over all active nodes and send an async getrecmode call to 
2323            them*/
2324         for (j=0; j<nodemap->num; j++) {
2325                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2326                         continue;
2327                 }
2328                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2329                                         CONTROL_TIMEOUT(), 
2330                                         nodemap->nodes[j].pnn);
2331                 if (state == NULL) {
2332                         /* we failed to send the control, treat this as 
2333                            an error and try again next iteration
2334                         */                      
2335                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2336                         talloc_free(mem_ctx);
2337                         return MONITOR_FAILED;
2338                 }
2339
2340                 /* set up the callback functions */
2341                 state->async.fn = verify_recmode_normal_callback;
2342                 state->async.private_data = rmdata;
2343
2344                 /* one more control to wait for to complete */
2345                 rmdata->count++;
2346         }
2347
2348
2349         /* now wait for up to the maximum number of seconds allowed
2350            or until all nodes we expect a response from has replied
2351         */
2352         while (rmdata->count > 0) {
2353                 event_loop_once(ctdb->ev);
2354         }
2355
2356         status = rmdata->status;
2357         talloc_free(mem_ctx);
2358         return status;
2359 }
2360
2361
2362 struct verify_recmaster_data {
2363         struct ctdb_recoverd *rec;
2364         uint32_t count;
2365         uint32_t pnn;
2366         enum monitor_result status;
2367 };
2368
2369 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2370 {
2371         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2372
2373
2374         /* one more node has responded with recmaster data*/
2375         rmdata->count--;
2376
2377         /* if we failed to get the recmaster, then return an error and let
2378            the main loop try again.
2379         */
2380         if (state->state != CTDB_CONTROL_DONE) {
2381                 if (rmdata->status == MONITOR_OK) {
2382                         rmdata->status = MONITOR_FAILED;
2383                 }
2384                 return;
2385         }
2386
2387         /* if we got a response, then the recmaster will be stored in the
2388            status field
2389         */
2390         if (state->status != rmdata->pnn) {
2391                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2392                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2393                 rmdata->status = MONITOR_ELECTION_NEEDED;
2394         }
2395
2396         return;
2397 }
2398
2399
2400 /* verify that all nodes agree that we are the recmaster */
2401 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2402 {
2403         struct ctdb_context *ctdb = rec->ctdb;
2404         struct verify_recmaster_data *rmdata;
2405         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2406         struct ctdb_client_control_state *state;
2407         enum monitor_result status;
2408         int j;
2409         
2410         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2411         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2412         rmdata->rec    = rec;
2413         rmdata->count  = 0;
2414         rmdata->pnn    = pnn;
2415         rmdata->status = MONITOR_OK;
2416
2417         /* loop over all active nodes and send an async getrecmaster call to 
2418            them*/
2419         for (j=0; j<nodemap->num; j++) {
2420                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2421                         continue;
2422                 }
2423                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2424                                         CONTROL_TIMEOUT(),
2425                                         nodemap->nodes[j].pnn);
2426                 if (state == NULL) {
2427                         /* we failed to send the control, treat this as 
2428                            an error and try again next iteration
2429                         */                      
2430                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2431                         talloc_free(mem_ctx);
2432                         return MONITOR_FAILED;
2433                 }
2434
2435                 /* set up the callback functions */
2436                 state->async.fn = verify_recmaster_callback;
2437                 state->async.private_data = rmdata;
2438
2439                 /* one more control to wait for to complete */
2440                 rmdata->count++;
2441         }
2442
2443
2444         /* now wait for up to the maximum number of seconds allowed
2445            or until all nodes we expect a response from has replied
2446         */
2447         while (rmdata->count > 0) {
2448                 event_loop_once(ctdb->ev);
2449         }
2450
2451         status = rmdata->status;
2452         talloc_free(mem_ctx);
2453         return status;
2454 }
2455
2456
2457 /* called to check that the local allocation of public ip addresses is ok.
2458 */
2459 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2460 {
2461         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2462         struct ctdb_control_get_ifaces *ifaces = NULL;
2463         struct ctdb_all_public_ips *ips = NULL;
2464         struct ctdb_uptime *uptime1 = NULL;
2465         struct ctdb_uptime *uptime2 = NULL;
2466         int ret, j;
2467         bool need_iface_check = false;
2468         bool need_takeover_run = false;
2469
2470         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2471                                 CTDB_CURRENT_NODE, &uptime1);
2472         if (ret != 0) {
2473                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2474                 talloc_free(mem_ctx);
2475                 return -1;
2476         }
2477
2478
2479         /* read the interfaces from the local node */
2480         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2481         if (ret != 0) {
2482                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2483                 talloc_free(mem_ctx);
2484                 return -1;
2485         }
2486
2487         if (!rec->ifaces) {
2488                 need_iface_check = true;
2489         } else if (rec->ifaces->num != ifaces->num) {
2490                 need_iface_check = true;
2491         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2492                 need_iface_check = true;
2493         }
2494
2495         if (need_iface_check) {
2496                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2497                                      "local node %u - force takeover run\n",
2498                                      pnn));
2499                 need_takeover_run = true;
2500         }
2501
2502         /* read the ip allocation from the local node */
2503         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2504         if (ret != 0) {
2505                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2506                 talloc_free(mem_ctx);
2507                 return -1;
2508         }
2509
2510         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2511                                 CTDB_CURRENT_NODE, &uptime2);
2512         if (ret != 0) {
2513                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2514                 talloc_free(mem_ctx);
2515                 return -1;
2516         }
2517
2518         /* skip the check if the startrecovery time has changed */
2519         if (timeval_compare(&uptime1->last_recovery_started,
2520                             &uptime2->last_recovery_started) != 0) {
2521                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2522                 talloc_free(mem_ctx);
2523                 return 0;
2524         }
2525
2526         /* skip the check if the endrecovery time has changed */
2527         if (timeval_compare(&uptime1->last_recovery_finished,
2528                             &uptime2->last_recovery_finished) != 0) {
2529                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2530                 talloc_free(mem_ctx);
2531                 return 0;
2532         }
2533
2534         /* skip the check if we have started but not finished recovery */
2535         if (timeval_compare(&uptime1->last_recovery_finished,
2536                             &uptime1->last_recovery_started) != 1) {
2537                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2538                 talloc_free(mem_ctx);
2539
2540                 return 0;
2541         }
2542
2543         talloc_free(rec->ifaces);
2544         rec->ifaces = talloc_steal(rec, ifaces);
2545
2546         /* verify that we have the ip addresses we should have
2547            and we dont have ones we shouldnt have.
2548            if we find an inconsistency we set recmode to
2549            active on the local node and wait for the recmaster
2550            to do a full blown recovery
2551         */
2552         for (j=0; j<ips->num; j++) {
2553                 if (ips->ips[j].pnn == pnn) {
2554                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2555                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2556                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2557                                 need_takeover_run = true;
2558                         }
2559                 } else {
2560                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2561                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2562                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2563                                 need_takeover_run = true;
2564                         }
2565                 }
2566         }
2567
2568         if (need_takeover_run) {
2569                 struct takeover_run_reply rd;
2570                 TDB_DATA data;
2571
2572                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2573
2574                 rd.pnn = ctdb->pnn;
2575                 rd.srvid = 0;
2576                 data.dptr = (uint8_t *)&rd;
2577                 data.dsize = sizeof(rd);
2578
2579                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2580                 if (ret != 0) {
2581                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2582                 }
2583         }
2584         talloc_free(mem_ctx);
2585         return 0;
2586 }
2587
2588
2589 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2590 {
2591         struct ctdb_node_map **remote_nodemaps = callback_data;
2592
2593         if (node_pnn >= ctdb->num_nodes) {
2594                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2595                 return;
2596         }
2597
2598         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2599
2600 }
2601
2602 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2603         struct ctdb_node_map *nodemap,
2604         struct ctdb_node_map **remote_nodemaps)
2605 {
2606         uint32_t *nodes;
2607
2608         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2609         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2610                                         nodes, 0,
2611                                         CONTROL_TIMEOUT(), false, tdb_null,
2612                                         async_getnodemap_callback,
2613                                         NULL,
2614                                         remote_nodemaps) != 0) {
2615                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2616
2617                 return -1;
2618         }
2619
2620         return 0;
2621 }
2622
2623 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2624 struct ctdb_check_reclock_state {
2625         struct ctdb_context *ctdb;
2626         struct timeval start_time;
2627         int fd[2];
2628         pid_t child;
2629         struct timed_event *te;
2630         struct fd_event *fde;
2631         enum reclock_child_status status;
2632 };
2633
2634 /* when we free the reclock state we must kill any child process.
2635 */
2636 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2637 {
2638         struct ctdb_context *ctdb = state->ctdb;
2639
2640         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2641
2642         if (state->fd[0] != -1) {
2643                 close(state->fd[0]);
2644                 state->fd[0] = -1;
2645         }
2646         if (state->fd[1] != -1) {
2647                 close(state->fd[1]);
2648                 state->fd[1] = -1;
2649         }
2650         kill(state->child, SIGKILL);
2651         return 0;
2652 }
2653
2654 /*
2655   called if our check_reclock child times out. this would happen if
2656   i/o to the reclock file blocks.
2657  */
2658 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2659                                          struct timeval t, void *private_data)
2660 {
2661         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2662                                            struct ctdb_check_reclock_state);
2663
2664         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2665         state->status = RECLOCK_TIMEOUT;
2666 }
2667
2668 /* this is called when the child process has completed checking the reclock
2669    file and has written data back to us through the pipe.
2670 */
2671 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2672                              uint16_t flags, void *private_data)
2673 {
2674         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2675                                              struct ctdb_check_reclock_state);
2676         char c = 0;
2677         int ret;
2678
2679         /* we got a response from our child process so we can abort the
2680            timeout.
2681         */
2682         talloc_free(state->te);
2683         state->te = NULL;
2684
2685         ret = read(state->fd[0], &c, 1);
2686         if (ret != 1 || c != RECLOCK_OK) {
2687                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2688                 state->status = RECLOCK_FAILED;
2689
2690                 return;
2691         }
2692
2693         state->status = RECLOCK_OK;
2694         return;
2695 }
2696
2697 static int check_recovery_lock(struct ctdb_context *ctdb)
2698 {
2699         int ret;
2700         struct ctdb_check_reclock_state *state;
2701         pid_t parent = getpid();
2702
2703         if (ctdb->recovery_lock_fd == -1) {
2704                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2705                 return -1;
2706         }
2707
2708         state = talloc(ctdb, struct ctdb_check_reclock_state);
2709         CTDB_NO_MEMORY(ctdb, state);
2710
2711         state->ctdb = ctdb;
2712         state->start_time = timeval_current();
2713         state->status = RECLOCK_CHECKING;
2714         state->fd[0] = -1;
2715         state->fd[1] = -1;
2716
2717         ret = pipe(state->fd);
2718         if (ret != 0) {
2719                 talloc_free(state);
2720                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2721                 return -1;
2722         }
2723
2724         state->child = fork();
2725         if (state->child == (pid_t)-1) {
2726                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2727                 close(state->fd[0]);
2728                 state->fd[0] = -1;
2729                 close(state->fd[1]);
2730                 state->fd[1] = -1;
2731                 talloc_free(state);
2732                 return -1;
2733         }
2734
2735         if (state->child == 0) {
2736                 char cc = RECLOCK_OK;
2737                 close(state->fd[0]);
2738                 state->fd[0] = -1;
2739
2740                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2741                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2742                         cc = RECLOCK_FAILED;
2743                 }
2744
2745                 write(state->fd[1], &cc, 1);
2746                 /* make sure we die when our parent dies */
2747                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2748                         sleep(5);
2749                         write(state->fd[1], &cc, 1);
2750                 }
2751                 _exit(0);
2752         }
2753         close(state->fd[1]);
2754         state->fd[1] = -1;
2755         set_close_on_exec(state->fd[0]);
2756
2757         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2758
2759         talloc_set_destructor(state, check_reclock_destructor);
2760
2761         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2762                                     ctdb_check_reclock_timeout, state);
2763         if (state->te == NULL) {
2764                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2765                 talloc_free(state);
2766                 return -1;
2767         }
2768
2769         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2770                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2771                                 reclock_child_handler,
2772                                 (void *)state);
2773
2774         if (state->fde == NULL) {
2775                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2776                 talloc_free(state);
2777                 return -1;
2778         }
2779
2780         while (state->status == RECLOCK_CHECKING) {
2781                 event_loop_once(ctdb->ev);
2782         }
2783
2784         if (state->status == RECLOCK_FAILED) {
2785                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2786                 close(ctdb->recovery_lock_fd);
2787                 ctdb->recovery_lock_fd = -1;
2788                 talloc_free(state);
2789                 return -1;
2790         }
2791
2792         talloc_free(state);
2793         return 0;
2794 }
2795
2796 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2797 {
2798         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2799         const char *reclockfile;
2800
2801         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2802                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2803                 talloc_free(tmp_ctx);
2804                 return -1;      
2805         }
2806
2807         if (reclockfile == NULL) {
2808                 if (ctdb->recovery_lock_file != NULL) {
2809                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2810                         talloc_free(ctdb->recovery_lock_file);
2811                         ctdb->recovery_lock_file = NULL;
2812                         if (ctdb->recovery_lock_fd != -1) {
2813                                 close(ctdb->recovery_lock_fd);
2814                                 ctdb->recovery_lock_fd = -1;
2815                         }
2816                 }
2817                 ctdb->tunable.verify_recovery_lock = 0;
2818                 talloc_free(tmp_ctx);
2819                 return 0;
2820         }
2821
2822         if (ctdb->recovery_lock_file == NULL) {
2823                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2824                 if (ctdb->recovery_lock_fd != -1) {
2825                         close(ctdb->recovery_lock_fd);
2826                         ctdb->recovery_lock_fd = -1;
2827                 }
2828                 talloc_free(tmp_ctx);
2829                 return 0;
2830         }
2831
2832
2833         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2834                 talloc_free(tmp_ctx);
2835                 return 0;
2836         }
2837
2838         talloc_free(ctdb->recovery_lock_file);
2839         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2840         ctdb->tunable.verify_recovery_lock = 0;
2841         if (ctdb->recovery_lock_fd != -1) {
2842                 close(ctdb->recovery_lock_fd);
2843                 ctdb->recovery_lock_fd = -1;
2844         }
2845
2846         talloc_free(tmp_ctx);
2847         return 0;
2848 }
2849
2850 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2851                       TALLOC_CTX *mem_ctx)
2852 {
2853         uint32_t pnn;
2854         struct ctdb_node_map *nodemap=NULL;
2855         struct ctdb_node_map *recmaster_nodemap=NULL;
2856         struct ctdb_node_map **remote_nodemaps=NULL;
2857         struct ctdb_vnn_map *vnnmap=NULL;
2858         struct ctdb_vnn_map *remote_vnnmap=NULL;
2859         int32_t debug_level;
2860         int i, j, ret;
2861
2862
2863
2864         /* verify that the main daemon is still running */
2865         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2866                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2867                 exit(-1);
2868         }
2869
2870         /* ping the local daemon to tell it we are alive */
2871         ctdb_ctrl_recd_ping(ctdb);
2872
2873         if (rec->election_timeout) {
2874                 /* an election is in progress */
2875                 return;
2876         }
2877
2878         /* read the debug level from the parent and update locally */
2879         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2880         if (ret !=0) {
2881                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2882                 return;
2883         }
2884         LogLevel = debug_level;
2885
2886
2887         /* We must check if we need to ban a node here but we want to do this
2888            as early as possible so we dont wait until we have pulled the node
2889            map from the local node. thats why we have the hardcoded value 20
2890         */
2891         for (i=0; i<ctdb->num_nodes; i++) {
2892                 struct ctdb_banning_state *ban_state;
2893
2894                 if (ctdb->nodes[i]->ban_state == NULL) {
2895                         continue;
2896                 }
2897                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2898                 if (ban_state->count < 20) {
2899                         continue;
2900                 }
2901                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2902                         ctdb->nodes[i]->pnn, ban_state->count,
2903                         ctdb->tunable.recovery_ban_period));
2904                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2905                 ban_state->count = 0;
2906         }
2907
2908         /* get relevant tunables */
2909         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2910         if (ret != 0) {
2911                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2912                 return;
2913         }
2914
2915         /* get the current recovery lock file from the server */
2916         if (update_recovery_lock_file(ctdb) != 0) {
2917                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2918                 return;
2919         }
2920
2921         /* Make sure that if recovery lock verification becomes disabled when
2922            we close the file
2923         */
2924         if (ctdb->tunable.verify_recovery_lock == 0) {
2925                 if (ctdb->recovery_lock_fd != -1) {
2926                         close(ctdb->recovery_lock_fd);
2927                         ctdb->recovery_lock_fd = -1;
2928                 }
2929         }
2930
2931         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2932         if (pnn == (uint32_t)-1) {
2933                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2934                 return;
2935         }
2936
2937         /* get the vnnmap */
2938         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2939         if (ret != 0) {
2940                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2941                 return;
2942         }
2943
2944
2945         /* get number of nodes */
2946         if (rec->nodemap) {
2947                 talloc_free(rec->nodemap);
2948                 rec->nodemap = NULL;
2949                 nodemap=NULL;
2950         }
2951         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2952         if (ret != 0) {
2953                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2954                 return;
2955         }
2956         nodemap = rec->nodemap;
2957
2958         /* remember our own node flags */
2959         rec->node_flags = nodemap->nodes[pnn].flags;
2960
2961         /* update the capabilities for all nodes */
2962         ret = update_capabilities(ctdb, nodemap);
2963         if (ret != 0) {
2964                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2965                 return;
2966         }
2967
2968         /* check which node is the recovery master */
2969         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2970         if (ret != 0) {
2971                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2972                 return;
2973         }
2974
2975         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2976         if (rec->recmaster != pnn) {
2977                 if (rec->ip_reallocate_ctx != NULL) {
2978                         talloc_free(rec->ip_reallocate_ctx);
2979                         rec->ip_reallocate_ctx = NULL;
2980                         rec->reallocate_callers = NULL;
2981                 }
2982         }
2983
2984         if (rec->recmaster == (uint32_t)-1) {
2985                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2986                 force_election(rec, pnn, nodemap);
2987                 return;
2988         }
2989
2990         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
2991            also frozen and thet the recmode is set to active.
2992         */
2993         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
2994                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2995                 if (ret != 0) {
2996                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2997                 }
2998                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2999                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3000
3001                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3002                         if (ret != 0) {
3003                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3004                                 return;
3005                         }
3006                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3007                         if (ret != 0) {
3008                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3009
3010                                 return;
3011                         }
3012                 }
3013
3014                 /* If this node is stopped or banned then it is not the recovery
3015                  * master, so don't do anything. This prevents stopped or banned
3016                  * node from starting election and sending unnecessary controls.
3017                  */
3018                 return;
3019         }
3020
3021         /*
3022          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3023          * but we have force an election and try to become the new
3024          * recmaster
3025          */
3026         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3027             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3028              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3029                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3030                                   " but we (node %u) have - force an election\n",
3031                                   rec->recmaster, pnn));
3032                 force_election(rec, pnn, nodemap);
3033                 return;
3034         }
3035
3036         /* count how many active nodes there are */
3037         rec->num_active    = 0;
3038         rec->num_connected = 0;
3039         for (i=0; i<nodemap->num; i++) {
3040                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3041                         rec->num_active++;
3042                 }
3043                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3044                         rec->num_connected++;
3045                 }
3046         }
3047
3048
3049         /* verify that the recmaster node is still active */
3050         for (j=0; j<nodemap->num; j++) {
3051                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3052                         break;
3053                 }
3054         }
3055
3056         if (j == nodemap->num) {
3057                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3058                 force_election(rec, pnn, nodemap);
3059                 return;
3060         }
3061
3062         /* if recovery master is disconnected we must elect a new recmaster */
3063         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3064                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3065                 force_election(rec, pnn, nodemap);
3066                 return;
3067         }
3068
3069         /* grap the nodemap from the recovery master to check if it is banned */
3070         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3071                                    mem_ctx, &recmaster_nodemap);
3072         if (ret != 0) {
3073                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3074                           nodemap->nodes[j].pnn));
3075                 return;
3076         }
3077
3078
3079         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3080                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3081                 force_election(rec, pnn, nodemap);
3082                 return;
3083         }
3084
3085         /* verify that we have all ip addresses we should have and we dont
3086          * have addresses we shouldnt have.
3087          */ 
3088         if (ctdb->do_checkpublicip) {
3089                 if (rec->ip_check_disable_ctx == NULL) {
3090                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3091                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3092                         }
3093                 }
3094         }
3095
3096
3097         /* if we are not the recmaster then we do not need to check
3098            if recovery is needed
3099          */
3100         if (pnn != rec->recmaster) {
3101                 return;
3102         }
3103
3104
3105         /* ensure our local copies of flags are right */
3106         ret = update_local_flags(rec, nodemap);
3107         if (ret == MONITOR_ELECTION_NEEDED) {
3108                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3109                 force_election(rec, pnn, nodemap);
3110                 return;
3111         }
3112         if (ret != MONITOR_OK) {
3113                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3114                 return;
3115         }
3116
3117         if (ctdb->num_nodes != nodemap->num) {
3118                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3119                 reload_nodes_file(ctdb);
3120                 return;
3121         }
3122
3123         /* verify that all active nodes agree that we are the recmaster */
3124         switch (verify_recmaster(rec, nodemap, pnn)) {
3125         case MONITOR_RECOVERY_NEEDED:
3126                 /* can not happen */
3127                 return;
3128         case MONITOR_ELECTION_NEEDED:
3129                 force_election(rec, pnn, nodemap);
3130                 return;
3131         case MONITOR_OK:
3132                 break;
3133         case MONITOR_FAILED:
3134                 return;
3135         }
3136
3137
3138         if (rec->need_recovery) {
3139                 /* a previous recovery didn't finish */
3140                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3141                 return;
3142         }
3143
3144         /* verify that all active nodes are in normal mode 
3145            and not in recovery mode 
3146         */
3147         switch (verify_recmode(ctdb, nodemap)) {
3148         case MONITOR_RECOVERY_NEEDED:
3149                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3150                 return;
3151         case MONITOR_FAILED:
3152                 return;
3153         case MONITOR_ELECTION_NEEDED:
3154                 /* can not happen */
3155         case MONITOR_OK:
3156                 break;
3157         }
3158
3159
3160         if (ctdb->tunable.verify_recovery_lock != 0) {
3161                 /* we should have the reclock - check its not stale */
3162                 ret = check_recovery_lock(ctdb);
3163                 if (ret != 0) {
3164                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3165                         ctdb_set_culprit(rec, ctdb->pnn);
3166                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3167                         return;
3168                 }
3169         }
3170
3171         /* if there are takeovers requested, perform it and notify the waiters */
3172         if (rec->reallocate_callers) {
3173                 process_ipreallocate_requests(ctdb, rec);
3174         }
3175
3176         /* get the nodemap for all active remote nodes
3177          */
3178         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3179         if (remote_nodemaps == NULL) {
3180                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3181                 return;
3182         }
3183         for(i=0; i<nodemap->num; i++) {
3184                 remote_nodemaps[i] = NULL;
3185         }
3186         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3187                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3188                 return;
3189         } 
3190
3191         /* verify that all other nodes have the same nodemap as we have
3192         */
3193         for (j=0; j<nodemap->num; j++) {
3194                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3195                         continue;
3196                 }
3197
3198                 if (remote_nodemaps[j] == NULL) {
3199                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3200                         ctdb_set_culprit(rec, j);
3201
3202                         return;
3203                 }
3204
3205                 /* if the nodes disagree on how many nodes there are
3206                    then this is a good reason to try recovery
3207                  */
3208                 if (remote_nodemaps[j]->num != nodemap->num) {
3209                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3210                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3211                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3212                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3213                         return;
3214                 }
3215
3216                 /* if the nodes disagree on which nodes exist and are
3217                    active, then that is also a good reason to do recovery
3218                  */
3219                 for (i=0;i<nodemap->num;i++) {
3220                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3221                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3222                                           nodemap->nodes[j].pnn, i, 
3223                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3224                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3225                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3226                                             vnnmap);
3227                                 return;
3228                         }
3229                 }
3230
3231                 /* verify the flags are consistent
3232                 */
3233                 for (i=0; i<nodemap->num; i++) {
3234                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3235                                 continue;
3236                         }
3237                         
3238                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3239                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3240                                   nodemap->nodes[j].pnn, 
3241                                   nodemap->nodes[i].pnn, 
3242                                   remote_nodemaps[j]->nodes[i].flags,
3243                                   nodemap->nodes[j].flags));
3244                                 if (i == j) {
3245                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3246                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3247                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3248                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3249                                                     vnnmap);
3250                                         return;
3251                                 } else {
3252                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3253                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3254                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3255                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3256                                                     vnnmap);
3257                                         return;
3258                                 }
3259                         }
3260                 }
3261         }
3262
3263
3264         /* there better be the same number of lmasters in the vnn map
3265            as there are active nodes or we will have to do a recovery
3266          */
3267         if (vnnmap->size != rec->num_active) {
3268                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3269                           vnnmap->size, rec->num_active));
3270                 ctdb_set_culprit(rec, ctdb->pnn);
3271                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3272                 return;
3273         }
3274
3275         /* verify that all active nodes in the nodemap also exist in 
3276            the vnnmap.
3277          */
3278         for (j=0; j<nodemap->num; j++) {
3279                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3280                         continue;
3281                 }
3282                 if (nodemap->nodes[j].pnn == pnn) {
3283                         continue;
3284                 }
3285
3286                 for (i=0; i<vnnmap->size; i++) {
3287                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3288                                 break;
3289                         }
3290                 }
3291                 if (i == vnnmap->size) {
3292                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3293                                   nodemap->nodes[j].pnn));
3294                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3295                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3296                         return;
3297                 }
3298         }
3299
3300         
3301         /* verify that all other nodes have the same vnnmap
3302            and are from the same generation
3303          */
3304         for (j=0; j<nodemap->num; j++) {
3305                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3306                         continue;
3307                 }
3308                 if (nodemap->nodes[j].pnn == pnn) {
3309                         continue;
3310                 }
3311
3312                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3313                                           mem_ctx, &remote_vnnmap);
3314                 if (ret != 0) {
3315                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3316                                   nodemap->nodes[j].pnn));
3317                         return;
3318                 }
3319
3320                 /* verify the vnnmap generation is the same */
3321                 if (vnnmap->generation != remote_vnnmap->generation) {
3322                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3323                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3324                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3325                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3326                         return;
3327                 }
3328
3329                 /* verify the vnnmap size is the same */
3330                 if (vnnmap->size != remote_vnnmap->size) {
3331                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3332                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3333                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3334                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3335                         return;
3336                 }
3337
3338                 /* verify the vnnmap is the same */
3339                 for (i=0;i<vnnmap->size;i++) {
3340                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3341                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3342                                           nodemap->nodes[j].pnn));
3343                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3344                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3345                                             vnnmap);
3346                                 return;
3347                         }
3348                 }
3349         }
3350
3351         /* we might need to change who has what IP assigned */
3352         if (rec->need_takeover_run) {
3353                 uint32_t culprit = (uint32_t)-1;
3354
3355                 rec->need_takeover_run = false;
3356
3357                 /* update the list of public ips that a node can handle for
3358                    all connected nodes
3359                 */
3360                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3361                 if (ret != 0) {
3362                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3363                                          culprit));
3364                         ctdb_set_culprit(rec, culprit);
3365                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3366                         return;
3367                 }
3368
3369                 /* execute the "startrecovery" event script on all nodes */
3370                 ret = run_startrecovery_eventscript(rec, nodemap);
3371                 if (ret!=0) {
3372                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3373                         ctdb_set_culprit(rec, ctdb->pnn);
3374                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3375                         return;
3376                 }
3377
3378                 ret = ctdb_takeover_run(ctdb, nodemap);
3379                 if (ret != 0) {
3380                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3381                         ctdb_set_culprit(rec, ctdb->pnn);
3382                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3383                         return;
3384                 }
3385
3386                 /* execute the "recovered" event script on all nodes */
3387                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3388 #if 0
3389 // we cant check whether the event completed successfully
3390 // since this script WILL fail if the node is in recovery mode
3391 // and if that race happens, the code here would just cause a second
3392 // cascading recovery.
3393                 if (ret!=0) {
3394                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3395                         ctdb_set_culprit(rec, ctdb->pnn);
3396                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3397                 }
3398 #endif
3399         }
3400 }
3401
3402 /*
3403   the main monitoring loop
3404  */
3405 static void monitor_cluster(struct ctdb_context *ctdb)
3406 {
3407         struct ctdb_recoverd *rec;
3408
3409         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3410
3411         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3412         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3413
3414         rec->ctdb = ctdb;
3415
3416         rec->priority_time = timeval_current();
3417
3418         /* register a message port for sending memory dumps */
3419         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3420
3421         /* register a message port for recovery elections */
3422         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3423
3424         /* when nodes are disabled/enabled */
3425         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3426
3427         /* when we are asked to puch out a flag change */
3428         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3429
3430         /* register a message port for vacuum fetch */
3431         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3432
3433         /* register a message port for reloadnodes  */
3434         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3435
3436         /* register a message port for performing a takeover run */
3437         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3438
3439         /* register a message port for disabling the ip check for a short while */
3440         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3441
3442         /* register a message port for updating the recovery daemons node assignment for an ip */
3443         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3444
3445         for (;;) {
3446                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3447                 if (!mem_ctx) {
3448                         DEBUG(DEBUG_CRIT,(__location__
3449                                           " Failed to create temp context\n"));
3450                         exit(-1);
3451                 }
3452
3453                 main_loop(ctdb, rec, mem_ctx);
3454                 talloc_free(mem_ctx);
3455
3456                 /* we only check for recovery once every second */
3457                 ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
3458         }
3459 }
3460
3461 /*
3462   event handler for when the main ctdbd dies
3463  */
3464 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3465                                  uint16_t flags, void *private_data)
3466 {
3467         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3468         _exit(1);
3469 }
3470
3471 /*
3472   called regularly to verify that the recovery daemon is still running
3473  */
3474 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3475                               struct timeval yt, void *p)
3476 {
3477         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3478
3479         if (kill(ctdb->recoverd_pid, 0) != 0) {
3480                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3481
3482                 ctdb_stop_recoverd(ctdb);
3483                 ctdb_stop_keepalive(ctdb);
3484                 ctdb_stop_monitoring(ctdb);
3485                 ctdb_release_all_ips(ctdb);
3486                 if (ctdb->methods != NULL) {
3487                         ctdb->methods->shutdown(ctdb);
3488                 }
3489                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3490
3491                 exit(10);       
3492         }
3493
3494         event_add_timed(ctdb->ev, ctdb, 
3495                         timeval_current_ofs(30, 0),
3496                         ctdb_check_recd, ctdb);
3497 }
3498
3499 static void recd_sig_child_handler(struct event_context *ev,
3500         struct signal_event *se, int signum, int count,
3501         void *dont_care, 
3502         void *private_data)
3503 {
3504 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3505         int status;
3506         pid_t pid = -1;
3507
3508         while (pid != 0) {
3509                 pid = waitpid(-1, &status, WNOHANG);
3510                 if (pid == -1) {
3511                         if (errno != ECHILD) {
3512                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3513                         }
3514                         return;
3515                 }
3516                 if (pid > 0) {
3517                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3518                 }
3519         }
3520 }
3521
3522 /*
3523   startup the recovery daemon as a child of the main ctdb daemon
3524  */
3525 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3526 {
3527         int fd[2];
3528         struct signal_event *se;
3529
3530         if (pipe(fd) != 0) {
3531                 return -1;
3532         }
3533
3534         ctdb->ctdbd_pid = getpid();
3535
3536         ctdb->recoverd_pid = fork();
3537         if (ctdb->recoverd_pid == -1) {
3538                 return -1;
3539         }
3540         
3541         if (ctdb->recoverd_pid != 0) {
3542                 close(fd[0]);
3543                 event_add_timed(ctdb->ev, ctdb, 
3544                                 timeval_current_ofs(30, 0),
3545                                 ctdb_check_recd, ctdb);
3546                 return 0;
3547         }
3548
3549         close(fd[1]);
3550
3551         srandom(getpid() ^ time(NULL));
3552
3553         if (switch_from_server_to_client(ctdb) != 0) {
3554                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3555                 exit(1);
3556         }
3557
3558         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3559
3560         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3561                      ctdb_recoverd_parent, &fd[0]);     
3562
3563         /* set up a handler to pick up sigchld */
3564         se = event_add_signal(ctdb->ev, ctdb,
3565                                      SIGCHLD, 0,
3566                                      recd_sig_child_handler,
3567                                      ctdb);
3568         if (se == NULL) {
3569                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3570                 exit(1);
3571         }
3572
3573         monitor_cluster(ctdb);
3574
3575         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3576         return -1;
3577 }
3578
3579 /*
3580   shutdown the recovery daemon
3581  */
3582 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3583 {
3584         if (ctdb->recoverd_pid == 0) {
3585                 return;
3586         }
3587
3588         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3589         kill(ctdb->recoverd_pid, SIGTERM);
3590 }