remove log message we dont need
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222 }
223
224 /*
225   update the node capabilities for all connected nodes
226  */
227 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
228 {
229         uint32_t *nodes;
230         TALLOC_CTX *tmp_ctx;
231
232         tmp_ctx = talloc_new(ctdb);
233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
234
235         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
236         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
237                                         nodes, 0,
238                                         CONTROL_TIMEOUT(),
239                                         false, tdb_null,
240                                         async_getcap_callback, NULL,
241                                         NULL) != 0) {
242                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
243                 talloc_free(tmp_ctx);
244                 return -1;
245         }
246
247         talloc_free(tmp_ctx);
248         return 0;
249 }
250
251 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
256         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
257 }
258
259 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
260 {
261         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
262
263         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
264         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
265 }
266
267 /*
268   change recovery mode on all nodes
269  */
270 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
271 {
272         TDB_DATA data;
273         uint32_t *nodes;
274         TALLOC_CTX *tmp_ctx;
275
276         tmp_ctx = talloc_new(ctdb);
277         CTDB_NO_MEMORY(ctdb, tmp_ctx);
278
279         /* freeze all nodes */
280         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
281         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
282                 int i;
283
284                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
285                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
286                                                 nodes, i,
287                                                 CONTROL_TIMEOUT(),
288                                                 false, tdb_null,
289                                                 NULL,
290                                                 set_recmode_fail_callback,
291                                                 rec) != 0) {
292                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
293                                 talloc_free(tmp_ctx);
294                                 return -1;
295                         }
296                 }
297         }
298
299
300         data.dsize = sizeof(uint32_t);
301         data.dptr = (unsigned char *)&rec_mode;
302
303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
304                                         nodes, 0,
305                                         CONTROL_TIMEOUT(),
306                                         false, data,
307                                         NULL, NULL,
308                                         NULL) != 0) {
309                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
310                 talloc_free(tmp_ctx);
311                 return -1;
312         }
313
314         talloc_free(tmp_ctx);
315         return 0;
316 }
317
318 /*
319   change recovery master on all node
320  */
321 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
322 {
323         TDB_DATA data;
324         TALLOC_CTX *tmp_ctx;
325         uint32_t *nodes;
326
327         tmp_ctx = talloc_new(ctdb);
328         CTDB_NO_MEMORY(ctdb, tmp_ctx);
329
330         data.dsize = sizeof(uint32_t);
331         data.dptr = (unsigned char *)&pnn;
332
333         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
335                                         nodes, 0,
336                                         CONTROL_TIMEOUT(), false, data,
337                                         NULL, NULL,
338                                         NULL) != 0) {
339                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
340                 talloc_free(tmp_ctx);
341                 return -1;
342         }
343
344         talloc_free(tmp_ctx);
345         return 0;
346 }
347
348 /* update all remote nodes to use the same db priority that we have
349    this can fail if the remove node has not yet been upgraded to 
350    support this function, so we always return success and never fail
351    a recovery if this call fails.
352 */
353 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
354         struct ctdb_node_map *nodemap, 
355         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int db;
358         uint32_t *nodes;
359
360         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
361
362         /* step through all local databases */
363         for (db=0; db<dbmap->num;db++) {
364                 TDB_DATA data;
365                 struct ctdb_db_priority db_prio;
366                 int ret;
367
368                 db_prio.db_id     = dbmap->dbs[db].dbid;
369                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
372                         continue;
373                 }
374
375                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
376
377                 data.dptr  = (uint8_t *)&db_prio;
378                 data.dsize = sizeof(db_prio);
379
380                 if (ctdb_client_async_control(ctdb,
381                                         CTDB_CONTROL_SET_DB_PRIORITY,
382                                         nodes, 0,
383                                         CONTROL_TIMEOUT(), false, data,
384                                         NULL, NULL,
385                                         NULL) != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
387                 }
388         }
389
390         return 0;
391 }                       
392
393 /*
394   ensure all other nodes have attached to any databases that we have
395  */
396 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
397                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
398 {
399         int i, j, db, ret;
400         struct ctdb_dbid_map *remote_dbmap;
401
402         /* verify that all other nodes have all our databases */
403         for (j=0; j<nodemap->num; j++) {
404                 /* we dont need to ourself ourselves */
405                 if (nodemap->nodes[j].pnn == pnn) {
406                         continue;
407                 }
408                 /* dont check nodes that are unavailable */
409                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
410                         continue;
411                 }
412
413                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
414                                          mem_ctx, &remote_dbmap);
415                 if (ret != 0) {
416                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
417                         return -1;
418                 }
419
420                 /* step through all local databases */
421                 for (db=0; db<dbmap->num;db++) {
422                         const char *name;
423
424
425                         for (i=0;i<remote_dbmap->num;i++) {
426                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
427                                         break;
428                                 }
429                         }
430                         /* the remote node already have this database */
431                         if (i!=remote_dbmap->num) {
432                                 continue;
433                         }
434                         /* ok so we need to create this database */
435                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
436                                             mem_ctx, &name);
437                         if (ret != 0) {
438                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
439                                 return -1;
440                         }
441                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                            mem_ctx, name, dbmap->dbs[db].persistent);
443                         if (ret != 0) {
444                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
445                                 return -1;
446                         }
447                 }
448         }
449
450         return 0;
451 }
452
453
454 /*
455   ensure we are attached to any databases that anyone else is attached to
456  */
457 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
458                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int i, j, db, ret;
461         struct ctdb_dbid_map *remote_dbmap;
462
463         /* verify that we have all database any other node has */
464         for (j=0; j<nodemap->num; j++) {
465                 /* we dont need to ourself ourselves */
466                 if (nodemap->nodes[j].pnn == pnn) {
467                         continue;
468                 }
469                 /* dont check nodes that are unavailable */
470                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
471                         continue;
472                 }
473
474                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
475                                          mem_ctx, &remote_dbmap);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
478                         return -1;
479                 }
480
481                 /* step through all databases on the remote node */
482                 for (db=0; db<remote_dbmap->num;db++) {
483                         const char *name;
484
485                         for (i=0;i<(*dbmap)->num;i++) {
486                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
487                                         break;
488                                 }
489                         }
490                         /* we already have this db locally */
491                         if (i!=(*dbmap)->num) {
492                                 continue;
493                         }
494                         /* ok so we need to create this database and
495                            rebuild dbmap
496                          */
497                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
499                         if (ret != 0) {
500                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
501                                           nodemap->nodes[j].pnn));
502                                 return -1;
503                         }
504                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
505                                            remote_dbmap->dbs[db].persistent);
506                         if (ret != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
508                                 return -1;
509                         }
510                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
511                         if (ret != 0) {
512                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
513                                 return -1;
514                         }
515                 }
516         }
517
518         return 0;
519 }
520
521
522 /*
523   pull the remote database contents from one node into the recdb
524  */
525 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
526                                     struct tdb_wrap *recdb, uint32_t dbid,
527                                     bool persistent)
528 {
529         int ret;
530         TDB_DATA outdata;
531         struct ctdb_marshall_buffer *reply;
532         struct ctdb_rec_data *rec;
533         int i;
534         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
535
536         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
537                                CONTROL_TIMEOUT(), &outdata);
538         if (ret != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
540                 talloc_free(tmp_ctx);
541                 return -1;
542         }
543
544         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
545
546         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
547                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
548                 talloc_free(tmp_ctx);
549                 return -1;
550         }
551         
552         rec = (struct ctdb_rec_data *)&reply->data[0];
553         
554         for (i=0;
555              i<reply->count;
556              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
557                 TDB_DATA key, data;
558                 struct ctdb_ltdb_header *hdr;
559                 TDB_DATA existing;
560                 
561                 key.dptr = &rec->data[0];
562                 key.dsize = rec->keylen;
563                 data.dptr = &rec->data[key.dsize];
564                 data.dsize = rec->datalen;
565                 
566                 hdr = (struct ctdb_ltdb_header *)data.dptr;
567
568                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
569                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
570                         talloc_free(tmp_ctx);
571                         return -1;
572                 }
573
574                 /* fetch the existing record, if any */
575                 existing = tdb_fetch(recdb->tdb, key);
576                 
577                 if (existing.dptr != NULL) {
578                         struct ctdb_ltdb_header header;
579                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
580                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
581                                          (unsigned)existing.dsize, srcnode));
582                                 free(existing.dptr);
583                                 talloc_free(tmp_ctx);
584                                 return -1;
585                         }
586                         header = *(struct ctdb_ltdb_header *)existing.dptr;
587                         free(existing.dptr);
588                         if (!(header.rsn < hdr->rsn ||
589                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
590                                 continue;
591                         }
592                 }
593                 
594                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
595                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
596                         talloc_free(tmp_ctx);
597                         return -1;                              
598                 }
599         }
600
601         talloc_free(tmp_ctx);
602
603         return 0;
604 }
605
606 /*
607   pull all the remote database contents into the recdb
608  */
609 static int pull_remote_database(struct ctdb_context *ctdb,
610                                 struct ctdb_recoverd *rec, 
611                                 struct ctdb_node_map *nodemap, 
612                                 struct tdb_wrap *recdb, uint32_t dbid,
613                                 bool persistent)
614 {
615         int j;
616
617         /* pull all records from all other nodes across onto this node
618            (this merges based on rsn)
619         */
620         for (j=0; j<nodemap->num; j++) {
621                 /* dont merge from nodes that are unavailable */
622                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
623                         continue;
624                 }
625                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
626                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
627                                  nodemap->nodes[j].pnn));
628                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
629                         return -1;
630                 }
631         }
632         
633         return 0;
634 }
635
636
637 /*
638   update flags on all active nodes
639  */
640 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
641 {
642         int ret;
643
644         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
645                 if (ret != 0) {
646                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
647                 return -1;
648         }
649
650         return 0;
651 }
652
653 /*
654   ensure all nodes have the same vnnmap we do
655  */
656 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
657                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
658 {
659         int j, ret;
660
661         /* push the new vnn map out to all the nodes */
662         for (j=0; j<nodemap->num; j++) {
663                 /* dont push to nodes that are unavailable */
664                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
665                         continue;
666                 }
667
668                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
669                 if (ret != 0) {
670                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
671                         return -1;
672                 }
673         }
674
675         return 0;
676 }
677
678
679 struct vacuum_info {
680         struct vacuum_info *next, *prev;
681         struct ctdb_recoverd *rec;
682         uint32_t srcnode;
683         struct ctdb_db_context *ctdb_db;
684         struct ctdb_marshall_buffer *recs;
685         struct ctdb_rec_data *r;
686 };
687
688 static void vacuum_fetch_next(struct vacuum_info *v);
689
690 /*
691   called when a vacuum fetch has completed - just free it and do the next one
692  */
693 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
694 {
695         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
696         talloc_free(state);
697         vacuum_fetch_next(v);
698 }
699
700
701 /*
702   process the next element from the vacuum list
703 */
704 static void vacuum_fetch_next(struct vacuum_info *v)
705 {
706         struct ctdb_call call;
707         struct ctdb_rec_data *r;
708
709         while (v->recs->count) {
710                 struct ctdb_client_call_state *state;
711                 TDB_DATA data;
712                 struct ctdb_ltdb_header *hdr;
713
714                 ZERO_STRUCT(call);
715                 call.call_id = CTDB_NULL_FUNC;
716                 call.flags = CTDB_IMMEDIATE_MIGRATION;
717                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
718
719                 r = v->r;
720                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
721                 v->recs->count--;
722
723                 call.key.dptr = &r->data[0];
724                 call.key.dsize = r->keylen;
725
726                 /* ensure we don't block this daemon - just skip a record if we can't get
727                    the chainlock */
728                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
729                         continue;
730                 }
731
732                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
733                 if (data.dptr == NULL) {
734                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
735                         continue;
736                 }
737
738                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
739                         free(data.dptr);
740                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
741                         continue;
742                 }
743                 
744                 hdr = (struct ctdb_ltdb_header *)data.dptr;
745                 if (hdr->dmaster == v->rec->ctdb->pnn) {
746                         /* its already local */
747                         free(data.dptr);
748                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
749                         continue;
750                 }
751
752                 free(data.dptr);
753
754                 state = ctdb_call_send(v->ctdb_db, &call);
755                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
756                 if (state == NULL) {
757                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
758                         talloc_free(v);
759                         return;
760                 }
761                 state->async.fn = vacuum_fetch_callback;
762                 state->async.private_data = v;
763                 return;
764         }
765
766         talloc_free(v);
767 }
768
769
770 /*
771   destroy a vacuum info structure
772  */
773 static int vacuum_info_destructor(struct vacuum_info *v)
774 {
775         DLIST_REMOVE(v->rec->vacuum_info, v);
776         return 0;
777 }
778
779
780 /*
781   handler for vacuum fetch
782 */
783 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
784                                  TDB_DATA data, void *private_data)
785 {
786         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
787         struct ctdb_marshall_buffer *recs;
788         int ret, i;
789         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
790         const char *name;
791         struct ctdb_dbid_map *dbmap=NULL;
792         bool persistent = false;
793         struct ctdb_db_context *ctdb_db;
794         struct ctdb_rec_data *r;
795         uint32_t srcnode;
796         struct vacuum_info *v;
797
798         recs = (struct ctdb_marshall_buffer *)data.dptr;
799         r = (struct ctdb_rec_data *)&recs->data[0];
800
801         if (recs->count == 0) {
802                 talloc_free(tmp_ctx);
803                 return;
804         }
805
806         srcnode = r->reqid;
807
808         for (v=rec->vacuum_info;v;v=v->next) {
809                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
810                         /* we're already working on records from this node */
811                         talloc_free(tmp_ctx);
812                         return;
813                 }
814         }
815
816         /* work out if the database is persistent */
817         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
818         if (ret != 0) {
819                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
820                 talloc_free(tmp_ctx);
821                 return;
822         }
823
824         for (i=0;i<dbmap->num;i++) {
825                 if (dbmap->dbs[i].dbid == recs->db_id) {
826                         persistent = dbmap->dbs[i].persistent;
827                         break;
828                 }
829         }
830         if (i == dbmap->num) {
831                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
832                 talloc_free(tmp_ctx);
833                 return;         
834         }
835
836         /* find the name of this database */
837         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
838                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
839                 talloc_free(tmp_ctx);
840                 return;
841         }
842
843         /* attach to it */
844         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
845         if (ctdb_db == NULL) {
846                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
847                 talloc_free(tmp_ctx);
848                 return;
849         }
850
851         v = talloc_zero(rec, struct vacuum_info);
852         if (v == NULL) {
853                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
854                 talloc_free(tmp_ctx);
855                 return;
856         }
857
858         v->rec = rec;
859         v->srcnode = srcnode;
860         v->ctdb_db = ctdb_db;
861         v->recs = talloc_memdup(v, recs, data.dsize);
862         if (v->recs == NULL) {
863                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
864                 talloc_free(v);
865                 talloc_free(tmp_ctx);
866                 return;         
867         }
868         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
869
870         DLIST_ADD(rec->vacuum_info, v);
871
872         talloc_set_destructor(v, vacuum_info_destructor);
873
874         vacuum_fetch_next(v);
875         talloc_free(tmp_ctx);
876 }
877
878
879 /*
880   called when ctdb_wait_timeout should finish
881  */
882 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
883                               struct timeval yt, void *p)
884 {
885         uint32_t *timed_out = (uint32_t *)p;
886         (*timed_out) = 1;
887 }
888
889 /*
890   wait for a given number of seconds
891  */
892 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
893 {
894         uint32_t timed_out = 0;
895         time_t usecs = (secs - (time_t)secs) * 1000000;
896         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
897         while (!timed_out) {
898                 event_loop_once(ctdb->ev);
899         }
900 }
901
902 /*
903   called when an election times out (ends)
904  */
905 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
906                                   struct timeval t, void *p)
907 {
908         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
909         rec->election_timeout = NULL;
910         fast_start = false;
911
912         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
913 }
914
915
916 /*
917   wait for an election to finish. It finished election_timeout seconds after
918   the last election packet is received
919  */
920 static void ctdb_wait_election(struct ctdb_recoverd *rec)
921 {
922         struct ctdb_context *ctdb = rec->ctdb;
923         while (rec->election_timeout) {
924                 event_loop_once(ctdb->ev);
925         }
926 }
927
928 /*
929   Update our local flags from all remote connected nodes. 
930   This is only run when we are or we belive we are the recovery master
931  */
932 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
933 {
934         int j;
935         struct ctdb_context *ctdb = rec->ctdb;
936         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
937
938         /* get the nodemap for all active remote nodes and verify
939            they are the same as for this node
940          */
941         for (j=0; j<nodemap->num; j++) {
942                 struct ctdb_node_map *remote_nodemap=NULL;
943                 int ret;
944
945                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
946                         continue;
947                 }
948                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
949                         continue;
950                 }
951
952                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
953                                            mem_ctx, &remote_nodemap);
954                 if (ret != 0) {
955                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
956                                   nodemap->nodes[j].pnn));
957                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
958                         talloc_free(mem_ctx);
959                         return MONITOR_FAILED;
960                 }
961                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
962                         /* We should tell our daemon about this so it
963                            updates its flags or else we will log the same 
964                            message again in the next iteration of recovery.
965                            Since we are the recovery master we can just as
966                            well update the flags on all nodes.
967                         */
968                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
969                         if (ret != 0) {
970                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
971                                 return -1;
972                         }
973
974                         /* Update our local copy of the flags in the recovery
975                            daemon.
976                         */
977                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
978                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
979                                  nodemap->nodes[j].flags));
980                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
981                 }
982                 talloc_free(remote_nodemap);
983         }
984         talloc_free(mem_ctx);
985         return MONITOR_OK;
986 }
987
988
989 /* Create a new random generation ip. 
990    The generation id can not be the INVALID_GENERATION id
991 */
992 static uint32_t new_generation(void)
993 {
994         uint32_t generation;
995
996         while (1) {
997                 generation = random();
998
999                 if (generation != INVALID_GENERATION) {
1000                         break;
1001                 }
1002         }
1003
1004         return generation;
1005 }
1006
1007
1008 /*
1009   create a temporary working database
1010  */
1011 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1012 {
1013         char *name;
1014         struct tdb_wrap *recdb;
1015         unsigned tdb_flags;
1016
1017         /* open up the temporary recovery database */
1018         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1019                                ctdb->db_directory_state,
1020                                ctdb->pnn);
1021         if (name == NULL) {
1022                 return NULL;
1023         }
1024         unlink(name);
1025
1026         tdb_flags = TDB_NOLOCK;
1027         if (ctdb->valgrinding) {
1028                 tdb_flags |= TDB_NOMMAP;
1029         }
1030         tdb_flags |= TDB_DISALLOW_NESTING;
1031
1032         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1033                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1034         if (recdb == NULL) {
1035                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1036         }
1037
1038         talloc_free(name);
1039
1040         return recdb;
1041 }
1042
1043
1044 /* 
1045    a traverse function for pulling all relevent records from recdb
1046  */
1047 struct recdb_data {
1048         struct ctdb_context *ctdb;
1049         struct ctdb_marshall_buffer *recdata;
1050         uint32_t len;
1051         bool failed;
1052         bool persistent;
1053 };
1054
1055 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1056 {
1057         struct recdb_data *params = (struct recdb_data *)p;
1058         struct ctdb_rec_data *rec;
1059         struct ctdb_ltdb_header *hdr;
1060
1061         /* skip empty records */
1062         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1063                 return 0;
1064         }
1065
1066         /* update the dmaster field to point to us */
1067         hdr = (struct ctdb_ltdb_header *)data.dptr;
1068         if (!params->persistent) {
1069                 hdr->dmaster = params->ctdb->pnn;
1070                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1071         }
1072
1073         /* add the record to the blob ready to send to the nodes */
1074         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1075         if (rec == NULL) {
1076                 params->failed = true;
1077                 return -1;
1078         }
1079         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1080         if (params->recdata == NULL) {
1081                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1082                          rec->length + params->len, params->recdata->count));
1083                 params->failed = true;
1084                 return -1;
1085         }
1086         params->recdata->count++;
1087         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1088         params->len += rec->length;
1089         talloc_free(rec);
1090
1091         return 0;
1092 }
1093
1094 /*
1095   push the recdb database out to all nodes
1096  */
1097 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1098                                bool persistent,
1099                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1100 {
1101         struct recdb_data params;
1102         struct ctdb_marshall_buffer *recdata;
1103         TDB_DATA outdata;
1104         TALLOC_CTX *tmp_ctx;
1105         uint32_t *nodes;
1106
1107         tmp_ctx = talloc_new(ctdb);
1108         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1109
1110         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1111         CTDB_NO_MEMORY(ctdb, recdata);
1112
1113         recdata->db_id = dbid;
1114
1115         params.ctdb = ctdb;
1116         params.recdata = recdata;
1117         params.len = offsetof(struct ctdb_marshall_buffer, data);
1118         params.failed = false;
1119         params.persistent = persistent;
1120
1121         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1122                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1123                 talloc_free(params.recdata);
1124                 talloc_free(tmp_ctx);
1125                 return -1;
1126         }
1127
1128         if (params.failed) {
1129                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1130                 talloc_free(params.recdata);
1131                 talloc_free(tmp_ctx);
1132                 return -1;              
1133         }
1134
1135         recdata = params.recdata;
1136
1137         outdata.dptr = (void *)recdata;
1138         outdata.dsize = params.len;
1139
1140         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1141         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1142                                         nodes, 0,
1143                                         CONTROL_TIMEOUT(), false, outdata,
1144                                         NULL, NULL,
1145                                         NULL) != 0) {
1146                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1147                 talloc_free(recdata);
1148                 talloc_free(tmp_ctx);
1149                 return -1;
1150         }
1151
1152         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1153                   dbid, recdata->count));
1154
1155         talloc_free(recdata);
1156         talloc_free(tmp_ctx);
1157
1158         return 0;
1159 }
1160
1161
1162 /*
1163   go through a full recovery on one database 
1164  */
1165 static int recover_database(struct ctdb_recoverd *rec, 
1166                             TALLOC_CTX *mem_ctx,
1167                             uint32_t dbid,
1168                             bool persistent,
1169                             uint32_t pnn, 
1170                             struct ctdb_node_map *nodemap,
1171                             uint32_t transaction_id)
1172 {
1173         struct tdb_wrap *recdb;
1174         int ret;
1175         struct ctdb_context *ctdb = rec->ctdb;
1176         TDB_DATA data;
1177         struct ctdb_control_wipe_database w;
1178         uint32_t *nodes;
1179
1180         recdb = create_recdb(ctdb, mem_ctx);
1181         if (recdb == NULL) {
1182                 return -1;
1183         }
1184
1185         /* pull all remote databases onto the recdb */
1186         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1187         if (ret != 0) {
1188                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1189                 return -1;
1190         }
1191
1192         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1193
1194         /* wipe all the remote databases. This is safe as we are in a transaction */
1195         w.db_id = dbid;
1196         w.transaction_id = transaction_id;
1197
1198         data.dptr = (void *)&w;
1199         data.dsize = sizeof(w);
1200
1201         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1202         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1203                                         nodes, 0,
1204                                         CONTROL_TIMEOUT(), false, data,
1205                                         NULL, NULL,
1206                                         NULL) != 0) {
1207                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1208                 talloc_free(recdb);
1209                 return -1;
1210         }
1211         
1212         /* push out the correct database. This sets the dmaster and skips 
1213            the empty records */
1214         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1215         if (ret != 0) {
1216                 talloc_free(recdb);
1217                 return -1;
1218         }
1219
1220         /* all done with this database */
1221         talloc_free(recdb);
1222
1223         return 0;
1224 }
1225
1226 /*
1227   reload the nodes file 
1228 */
1229 static void reload_nodes_file(struct ctdb_context *ctdb)
1230 {
1231         ctdb->nodes = NULL;
1232         ctdb_load_nodes_file(ctdb);
1233 }
1234
1235 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1236                                          struct ctdb_recoverd *rec,
1237                                          struct ctdb_node_map *nodemap,
1238                                          uint32_t *culprit)
1239 {
1240         int j;
1241         int ret;
1242
1243         if (ctdb->num_nodes != nodemap->num) {
1244                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1245                                   ctdb->num_nodes, nodemap->num));
1246                 if (culprit) {
1247                         *culprit = ctdb->pnn;
1248                 }
1249                 return -1;
1250         }
1251
1252         for (j=0; j<nodemap->num; j++) {
1253                 /* release any existing data */
1254                 if (ctdb->nodes[j]->known_public_ips) {
1255                         talloc_free(ctdb->nodes[j]->known_public_ips);
1256                         ctdb->nodes[j]->known_public_ips = NULL;
1257                 }
1258                 if (ctdb->nodes[j]->available_public_ips) {
1259                         talloc_free(ctdb->nodes[j]->available_public_ips);
1260                         ctdb->nodes[j]->available_public_ips = NULL;
1261                 }
1262
1263                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1264                         continue;
1265                 }
1266
1267                 /* grab a new shiny list of public ips from the node */
1268                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1269                                         CONTROL_TIMEOUT(),
1270                                         ctdb->nodes[j]->pnn,
1271                                         ctdb->nodes,
1272                                         0,
1273                                         &ctdb->nodes[j]->known_public_ips);
1274                 if (ret != 0) {
1275                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1276                                 ctdb->nodes[j]->pnn));
1277                         if (culprit) {
1278                                 *culprit = ctdb->nodes[j]->pnn;
1279                         }
1280                         return -1;
1281                 }
1282
1283                 if (ctdb->tunable.disable_ip_failover == 0) {
1284                         if (rec->ip_check_disable_ctx == NULL) {
1285                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1286                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1287                                         rec->need_takeover_run = true;
1288                                 }
1289                         }
1290                 }
1291
1292                 /* grab a new shiny list of public ips from the node */
1293                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1294                                         CONTROL_TIMEOUT(),
1295                                         ctdb->nodes[j]->pnn,
1296                                         ctdb->nodes,
1297                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1298                                         &ctdb->nodes[j]->available_public_ips);
1299                 if (ret != 0) {
1300                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1301                                 ctdb->nodes[j]->pnn));
1302                         if (culprit) {
1303                                 *culprit = ctdb->nodes[j]->pnn;
1304                         }
1305                         return -1;
1306                 }
1307         }
1308
1309         return 0;
1310 }
1311
1312 /* when we start a recovery, make sure all nodes use the same reclock file
1313    setting
1314 */
1315 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1316 {
1317         struct ctdb_context *ctdb = rec->ctdb;
1318         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1319         TDB_DATA data;
1320         uint32_t *nodes;
1321
1322         if (ctdb->recovery_lock_file == NULL) {
1323                 data.dptr  = NULL;
1324                 data.dsize = 0;
1325         } else {
1326                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1327                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1328         }
1329
1330         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1332                                         nodes, 0,
1333                                         CONTROL_TIMEOUT(),
1334                                         false, data,
1335                                         NULL, NULL,
1336                                         rec) != 0) {
1337                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1338                 talloc_free(tmp_ctx);
1339                 return -1;
1340         }
1341
1342         talloc_free(tmp_ctx);
1343         return 0;
1344 }
1345
1346
1347 /*
1348   we are the recmaster, and recovery is needed - start a recovery run
1349  */
1350 static int do_recovery(struct ctdb_recoverd *rec, 
1351                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1352                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1353 {
1354         struct ctdb_context *ctdb = rec->ctdb;
1355         int i, j, ret;
1356         uint32_t generation;
1357         struct ctdb_dbid_map *dbmap;
1358         TDB_DATA data;
1359         uint32_t *nodes;
1360         struct timeval start_time;
1361         uint32_t culprit = (uint32_t)-1;
1362
1363         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1364
1365         /* if recovery fails, force it again */
1366         rec->need_recovery = true;
1367
1368         for (i=0; i<ctdb->num_nodes; i++) {
1369                 struct ctdb_banning_state *ban_state;
1370
1371                 if (ctdb->nodes[i]->ban_state == NULL) {
1372                         continue;
1373                 }
1374                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1375                 if (ban_state->count < 2*ctdb->num_nodes) {
1376                         continue;
1377                 }
1378                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1379                         ctdb->nodes[i]->pnn, ban_state->count,
1380                         ctdb->tunable.recovery_ban_period));
1381                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1382                 ban_state->count = 0;
1383         }
1384
1385
1386         if (ctdb->tunable.verify_recovery_lock != 0) {
1387                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1388                 start_time = timeval_current();
1389                 if (!ctdb_recovery_lock(ctdb, true)) {
1390                         ctdb_set_culprit(rec, pnn);
1391                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1392                         return -1;
1393                 }
1394                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1395                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1396         }
1397
1398         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1399
1400         /* get a list of all databases */
1401         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1402         if (ret != 0) {
1403                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1404                 return -1;
1405         }
1406
1407         /* we do the db creation before we set the recovery mode, so the freeze happens
1408            on all databases we will be dealing with. */
1409
1410         /* verify that we have all the databases any other node has */
1411         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1412         if (ret != 0) {
1413                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1414                 return -1;
1415         }
1416
1417         /* verify that all other nodes have all our databases */
1418         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1419         if (ret != 0) {
1420                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1421                 return -1;
1422         }
1423         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1424
1425         /* update the database priority for all remote databases */
1426         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1427         if (ret != 0) {
1428                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1429         }
1430         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1431
1432
1433         /* update all other nodes to use the same setting for reclock files
1434            as the local recovery master.
1435         */
1436         sync_recovery_lock_file_across_cluster(rec);
1437
1438         /* set recovery mode to active on all nodes */
1439         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1440         if (ret != 0) {
1441                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1442                 return -1;
1443         }
1444
1445         /* execute the "startrecovery" event script on all nodes */
1446         ret = run_startrecovery_eventscript(rec, nodemap);
1447         if (ret!=0) {
1448                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1449                 return -1;
1450         }
1451
1452         /*
1453           update all nodes to have the same flags that we have
1454          */
1455         for (i=0;i<nodemap->num;i++) {
1456                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1457                         continue;
1458                 }
1459
1460                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1461                 if (ret != 0) {
1462                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1463                         return -1;
1464                 }
1465         }
1466
1467         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1468
1469         /* pick a new generation number */
1470         generation = new_generation();
1471
1472         /* change the vnnmap on this node to use the new generation 
1473            number but not on any other nodes.
1474            this guarantees that if we abort the recovery prematurely
1475            for some reason (a node stops responding?)
1476            that we can just return immediately and we will reenter
1477            recovery shortly again.
1478            I.e. we deliberately leave the cluster with an inconsistent
1479            generation id to allow us to abort recovery at any stage and
1480            just restart it from scratch.
1481          */
1482         vnnmap->generation = generation;
1483         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1484         if (ret != 0) {
1485                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1486                 return -1;
1487         }
1488
1489         data.dptr = (void *)&generation;
1490         data.dsize = sizeof(uint32_t);
1491
1492         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1493         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1494                                         nodes, 0,
1495                                         CONTROL_TIMEOUT(), false, data,
1496                                         NULL,
1497                                         transaction_start_fail_callback,
1498                                         rec) != 0) {
1499                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1500                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1501                                         nodes, 0,
1502                                         CONTROL_TIMEOUT(), false, tdb_null,
1503                                         NULL,
1504                                         NULL,
1505                                         NULL) != 0) {
1506                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1507                 }
1508                 return -1;
1509         }
1510
1511         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1512
1513         for (i=0;i<dbmap->num;i++) {
1514                 ret = recover_database(rec, mem_ctx,
1515                                        dbmap->dbs[i].dbid,
1516                                        dbmap->dbs[i].persistent,
1517                                        pnn, nodemap, generation);
1518                 if (ret != 0) {
1519                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1520                         return -1;
1521                 }
1522         }
1523
1524         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1525
1526         /* commit all the changes */
1527         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1528                                         nodes, 0,
1529                                         CONTROL_TIMEOUT(), false, data,
1530                                         NULL, NULL,
1531                                         NULL) != 0) {
1532                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1533                 return -1;
1534         }
1535
1536         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1537         
1538
1539         /* update the capabilities for all nodes */
1540         ret = update_capabilities(ctdb, nodemap);
1541         if (ret!=0) {
1542                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1543                 return -1;
1544         }
1545
1546         /* build a new vnn map with all the currently active and
1547            unbanned nodes */
1548         generation = new_generation();
1549         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1550         CTDB_NO_MEMORY(ctdb, vnnmap);
1551         vnnmap->generation = generation;
1552         vnnmap->size = 0;
1553         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1554         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1555         for (i=j=0;i<nodemap->num;i++) {
1556                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1557                         continue;
1558                 }
1559                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1560                         /* this node can not be an lmaster */
1561                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1562                         continue;
1563                 }
1564
1565                 vnnmap->size++;
1566                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1567                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1568                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1569
1570         }
1571         if (vnnmap->size == 0) {
1572                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1573                 vnnmap->size++;
1574                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1575                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1576                 vnnmap->map[0] = pnn;
1577         }       
1578
1579         /* update to the new vnnmap on all nodes */
1580         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1581         if (ret != 0) {
1582                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1583                 return -1;
1584         }
1585
1586         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1587
1588         /* update recmaster to point to us for all nodes */
1589         ret = set_recovery_master(ctdb, nodemap, pnn);
1590         if (ret!=0) {
1591                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1592                 return -1;
1593         }
1594
1595         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1596
1597         /*
1598           update all nodes to have the same flags that we have
1599          */
1600         for (i=0;i<nodemap->num;i++) {
1601                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1602                         continue;
1603                 }
1604
1605                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1606                 if (ret != 0) {
1607                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1608                         return -1;
1609                 }
1610         }
1611
1612         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1613
1614         /* disable recovery mode */
1615         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1616         if (ret != 0) {
1617                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1618                 return -1;
1619         }
1620
1621         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1622
1623         /*
1624           tell nodes to takeover their public IPs
1625          */
1626         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1627         if (ret != 0) {
1628                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1629                                  culprit));
1630                 rec->need_takeover_run = true;
1631                 return -1;
1632         }
1633         rec->need_takeover_run = false;
1634         ret = ctdb_takeover_run(ctdb, nodemap);
1635         if (ret != 0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1637                 rec->need_takeover_run = true;
1638         }
1639
1640         /* execute the "recovered" event script on all nodes */
1641         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1642         if (ret!=0) {
1643                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1644                 return -1;
1645         }
1646
1647         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1648
1649         /* send a message to all clients telling them that the cluster 
1650            has been reconfigured */
1651         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1652
1653         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1654
1655         rec->need_recovery = false;
1656
1657         /* we managed to complete a full recovery, make sure to forgive
1658            any past sins by the nodes that could now participate in the
1659            recovery.
1660         */
1661         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1662         for (i=0;i<nodemap->num;i++) {
1663                 struct ctdb_banning_state *ban_state;
1664
1665                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1666                         continue;
1667                 }
1668
1669                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1670                 if (ban_state == NULL) {
1671                         continue;
1672                 }
1673
1674                 ban_state->count = 0;
1675         }
1676
1677
1678         /* We just finished a recovery successfully. 
1679            We now wait for rerecovery_timeout before we allow 
1680            another recovery to take place.
1681         */
1682         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1683         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1684         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1685
1686         return 0;
1687 }
1688
1689
1690 /*
1691   elections are won by first checking the number of connected nodes, then
1692   the priority time, then the pnn
1693  */
1694 struct election_message {
1695         uint32_t num_connected;
1696         struct timeval priority_time;
1697         uint32_t pnn;
1698         uint32_t node_flags;
1699 };
1700
1701 /*
1702   form this nodes election data
1703  */
1704 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1705 {
1706         int ret, i;
1707         struct ctdb_node_map *nodemap;
1708         struct ctdb_context *ctdb = rec->ctdb;
1709
1710         ZERO_STRUCTP(em);
1711
1712         em->pnn = rec->ctdb->pnn;
1713         em->priority_time = rec->priority_time;
1714
1715         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1716         if (ret != 0) {
1717                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1718                 return;
1719         }
1720
1721         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1722         em->node_flags = rec->node_flags;
1723
1724         for (i=0;i<nodemap->num;i++) {
1725                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1726                         em->num_connected++;
1727                 }
1728         }
1729
1730         /* we shouldnt try to win this election if we cant be a recmaster */
1731         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1732                 em->num_connected = 0;
1733                 em->priority_time = timeval_current();
1734         }
1735
1736         talloc_free(nodemap);
1737 }
1738
1739 /*
1740   see if the given election data wins
1741  */
1742 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1743 {
1744         struct election_message myem;
1745         int cmp = 0;
1746
1747         ctdb_election_data(rec, &myem);
1748
1749         /* we cant win if we dont have the recmaster capability */
1750         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1751                 return false;
1752         }
1753
1754         /* we cant win if we are banned */
1755         if (rec->node_flags & NODE_FLAGS_BANNED) {
1756                 return false;
1757         }       
1758
1759         /* we cant win if we are stopped */
1760         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1761                 return false;
1762         }       
1763
1764         /* we will automatically win if the other node is banned */
1765         if (em->node_flags & NODE_FLAGS_BANNED) {
1766                 return true;
1767         }
1768
1769         /* we will automatically win if the other node is banned */
1770         if (em->node_flags & NODE_FLAGS_STOPPED) {
1771                 return true;
1772         }
1773
1774         /* try to use the most connected node */
1775         if (cmp == 0) {
1776                 cmp = (int)myem.num_connected - (int)em->num_connected;
1777         }
1778
1779         /* then the longest running node */
1780         if (cmp == 0) {
1781                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1782         }
1783
1784         if (cmp == 0) {
1785                 cmp = (int)myem.pnn - (int)em->pnn;
1786         }
1787
1788         return cmp > 0;
1789 }
1790
1791 /*
1792   send out an election request
1793  */
1794 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1795 {
1796         int ret;
1797         TDB_DATA election_data;
1798         struct election_message emsg;
1799         uint64_t srvid;
1800         struct ctdb_context *ctdb = rec->ctdb;
1801
1802         srvid = CTDB_SRVID_RECOVERY;
1803
1804         ctdb_election_data(rec, &emsg);
1805
1806         election_data.dsize = sizeof(struct election_message);
1807         election_data.dptr  = (unsigned char *)&emsg;
1808
1809
1810         /* send an election message to all active nodes */
1811         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1812         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1813
1814
1815         /* A new node that is already frozen has entered the cluster.
1816            The existing nodes are not frozen and dont need to be frozen
1817            until the election has ended and we start the actual recovery
1818         */
1819         if (update_recmaster == true) {
1820                 /* first we assume we will win the election and set 
1821                    recoverymaster to be ourself on the current node
1822                  */
1823                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1824                 if (ret != 0) {
1825                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1826                         return -1;
1827                 }
1828         }
1829
1830
1831         return 0;
1832 }
1833
1834 /*
1835   this function will unban all nodes in the cluster
1836 */
1837 static void unban_all_nodes(struct ctdb_context *ctdb)
1838 {
1839         int ret, i;
1840         struct ctdb_node_map *nodemap;
1841         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1842         
1843         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1846                 return;
1847         }
1848
1849         for (i=0;i<nodemap->num;i++) {
1850                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1851                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1852                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1853                 }
1854         }
1855
1856         talloc_free(tmp_ctx);
1857 }
1858
1859
1860 /*
1861   we think we are winning the election - send a broadcast election request
1862  */
1863 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1864 {
1865         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1866         int ret;
1867
1868         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1869         if (ret != 0) {
1870                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1871         }
1872
1873         talloc_free(rec->send_election_te);
1874         rec->send_election_te = NULL;
1875 }
1876
1877 /*
1878   handler for memory dumps
1879 */
1880 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1881                              TDB_DATA data, void *private_data)
1882 {
1883         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1884         TDB_DATA *dump;
1885         int ret;
1886         struct rd_memdump_reply *rd;
1887
1888         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1889                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1890                 talloc_free(tmp_ctx);
1891                 return;
1892         }
1893         rd = (struct rd_memdump_reply *)data.dptr;
1894
1895         dump = talloc_zero(tmp_ctx, TDB_DATA);
1896         if (dump == NULL) {
1897                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1898                 talloc_free(tmp_ctx);
1899                 return;
1900         }
1901         ret = ctdb_dump_memory(ctdb, dump);
1902         if (ret != 0) {
1903                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1904                 talloc_free(tmp_ctx);
1905                 return;
1906         }
1907
1908 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1909
1910         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1911         if (ret != 0) {
1912                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1913                 talloc_free(tmp_ctx);
1914                 return;
1915         }
1916
1917         talloc_free(tmp_ctx);
1918 }
1919
1920 /*
1921   handler for reload_nodes
1922 */
1923 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1924                              TDB_DATA data, void *private_data)
1925 {
1926         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1927
1928         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1929
1930         reload_nodes_file(rec->ctdb);
1931 }
1932
1933
1934 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1935                               struct timeval yt, void *p)
1936 {
1937         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1938
1939         talloc_free(rec->ip_check_disable_ctx);
1940         rec->ip_check_disable_ctx = NULL;
1941 }
1942
1943
1944 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1945                              TDB_DATA data, void *private_data)
1946 {
1947         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1948         struct ctdb_public_ip *ip;
1949
1950         if (rec->recmaster != rec->ctdb->pnn) {
1951                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1952                 return;
1953         }
1954
1955         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1956                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1957                 return;
1958         }
1959
1960         ip = (struct ctdb_public_ip *)data.dptr;
1961
1962         update_ip_assignment_tree(rec->ctdb, ip);
1963 }
1964
1965
1966 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1967                              TDB_DATA data, void *private_data)
1968 {
1969         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1970         uint32_t timeout;
1971
1972         if (rec->ip_check_disable_ctx != NULL) {
1973                 talloc_free(rec->ip_check_disable_ctx);
1974                 rec->ip_check_disable_ctx = NULL;
1975         }
1976
1977         if (data.dsize != sizeof(uint32_t)) {
1978                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1979                                  "expexting %lu\n", (long unsigned)data.dsize,
1980                                  (long unsigned)sizeof(uint32_t)));
1981                 return;
1982         }
1983         if (data.dptr == NULL) {
1984                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1985                 return;
1986         }
1987
1988         timeout = *((uint32_t *)data.dptr);
1989         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1990
1991         rec->ip_check_disable_ctx = talloc_new(rec);
1992         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1993
1994         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1995 }
1996
1997
1998 /*
1999   handler for ip reallocate, just add it to the list of callers and 
2000   handle this later in the monitor_cluster loop so we do not recurse
2001   with other callers to takeover_run()
2002 */
2003 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2004                              TDB_DATA data, void *private_data)
2005 {
2006         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2007         struct ip_reallocate_list *caller;
2008
2009         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2010                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2011                 return;
2012         }
2013
2014         if (rec->ip_reallocate_ctx == NULL) {
2015                 rec->ip_reallocate_ctx = talloc_new(rec);
2016                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2017         }
2018
2019         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2020         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2021
2022         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2023         caller->next = rec->reallocate_callers;
2024         rec->reallocate_callers = caller;
2025
2026         return;
2027 }
2028
2029 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2030 {
2031         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2032         TDB_DATA result;
2033         int32_t ret;
2034         struct ip_reallocate_list *callers;
2035         uint32_t culprit;
2036
2037         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2038
2039         /* update the list of public ips that a node can handle for
2040            all connected nodes
2041         */
2042         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2043         if (ret != 0) {
2044                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2045                                  culprit));
2046                 rec->need_takeover_run = true;
2047         }
2048         if (ret == 0) {
2049                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2050                 if (ret != 0) {
2051                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2052                         rec->need_takeover_run = true;
2053                 }
2054         }
2055
2056         result.dsize = sizeof(int32_t);
2057         result.dptr  = (uint8_t *)&ret;
2058
2059         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2060
2061                 /* Someone that sent srvid==0 does not want a reply */
2062                 if (callers->rd->srvid == 0) {
2063                         continue;
2064                 }
2065                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2066                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2067                                   (unsigned long long)callers->rd->srvid));
2068                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2069                 if (ret != 0) {
2070                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2071                                          "message to %u:%llu\n",
2072                                          (unsigned)callers->rd->pnn,
2073                                          (unsigned long long)callers->rd->srvid));
2074                 }
2075         }
2076
2077         talloc_free(tmp_ctx);
2078         talloc_free(rec->ip_reallocate_ctx);
2079         rec->ip_reallocate_ctx = NULL;
2080         rec->reallocate_callers = NULL;
2081         
2082 }
2083
2084
2085 /*
2086   handler for recovery master elections
2087 */
2088 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2089                              TDB_DATA data, void *private_data)
2090 {
2091         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2092         int ret;
2093         struct election_message *em = (struct election_message *)data.dptr;
2094         TALLOC_CTX *mem_ctx;
2095
2096         /* we got an election packet - update the timeout for the election */
2097         talloc_free(rec->election_timeout);
2098         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2099                                                 fast_start ?
2100                                                 timeval_current_ofs(0, 500000) :
2101                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2102                                                 ctdb_election_timeout, rec);
2103
2104         mem_ctx = talloc_new(ctdb);
2105
2106         /* someone called an election. check their election data
2107            and if we disagree and we would rather be the elected node, 
2108            send a new election message to all other nodes
2109          */
2110         if (ctdb_election_win(rec, em)) {
2111                 if (!rec->send_election_te) {
2112                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2113                                                                 timeval_current_ofs(0, 500000),
2114                                                                 election_send_request, rec);
2115                 }
2116                 talloc_free(mem_ctx);
2117                 /*unban_all_nodes(ctdb);*/
2118                 return;
2119         }
2120         
2121         /* we didn't win */
2122         talloc_free(rec->send_election_te);
2123         rec->send_election_te = NULL;
2124
2125         if (ctdb->tunable.verify_recovery_lock != 0) {
2126                 /* release the recmaster lock */
2127                 if (em->pnn != ctdb->pnn &&
2128                     ctdb->recovery_lock_fd != -1) {
2129                         close(ctdb->recovery_lock_fd);
2130                         ctdb->recovery_lock_fd = -1;
2131                         unban_all_nodes(ctdb);
2132                 }
2133         }
2134
2135         /* ok, let that guy become recmaster then */
2136         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2139                 talloc_free(mem_ctx);
2140                 return;
2141         }
2142
2143         talloc_free(mem_ctx);
2144         return;
2145 }
2146
2147
2148 /*
2149   force the start of the election process
2150  */
2151 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2152                            struct ctdb_node_map *nodemap)
2153 {
2154         int ret;
2155         struct ctdb_context *ctdb = rec->ctdb;
2156
2157         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2158
2159         /* set all nodes to recovery mode to stop all internode traffic */
2160         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2161         if (ret != 0) {
2162                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2163                 return;
2164         }
2165
2166         talloc_free(rec->election_timeout);
2167         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2168                                                 fast_start ?
2169                                                 timeval_current_ofs(0, 500000) :
2170                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2171                                                 ctdb_election_timeout, rec);
2172
2173         ret = send_election_request(rec, pnn, true);
2174         if (ret!=0) {
2175                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2176                 return;
2177         }
2178
2179         /* wait for a few seconds to collect all responses */
2180         ctdb_wait_election(rec);
2181 }
2182
2183
2184
2185 /*
2186   handler for when a node changes its flags
2187 */
2188 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2189                             TDB_DATA data, void *private_data)
2190 {
2191         int ret;
2192         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2193         struct ctdb_node_map *nodemap=NULL;
2194         TALLOC_CTX *tmp_ctx;
2195         uint32_t changed_flags;
2196         int i;
2197         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2198         int disabled_flag_changed;
2199
2200         if (data.dsize != sizeof(*c)) {
2201                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2202                 return;
2203         }
2204
2205         tmp_ctx = talloc_new(ctdb);
2206         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2207
2208         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2209         if (ret != 0) {
2210                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2211                 talloc_free(tmp_ctx);
2212                 return;         
2213         }
2214
2215
2216         for (i=0;i<nodemap->num;i++) {
2217                 if (nodemap->nodes[i].pnn == c->pnn) break;
2218         }
2219
2220         if (i == nodemap->num) {
2221                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2222                 talloc_free(tmp_ctx);
2223                 return;
2224         }
2225
2226         changed_flags = c->old_flags ^ c->new_flags;
2227
2228         if (nodemap->nodes[i].flags != c->new_flags) {
2229                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2230         }
2231
2232         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2233
2234         nodemap->nodes[i].flags = c->new_flags;
2235
2236         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2237                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2238
2239         if (ret == 0) {
2240                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2241                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2242         }
2243         
2244         if (ret == 0 &&
2245             ctdb->recovery_master == ctdb->pnn &&
2246             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2247                 /* Only do the takeover run if the perm disabled or unhealthy
2248                    flags changed since these will cause an ip failover but not
2249                    a recovery.
2250                    If the node became disconnected or banned this will also
2251                    lead to an ip address failover but that is handled 
2252                    during recovery
2253                 */
2254                 if (disabled_flag_changed) {
2255                         rec->need_takeover_run = true;
2256                 }
2257         }
2258
2259         talloc_free(tmp_ctx);
2260 }
2261
2262 /*
2263   handler for when we need to push out flag changes ot all other nodes
2264 */
2265 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2266                             TDB_DATA data, void *private_data)
2267 {
2268         int ret;
2269         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2270         struct ctdb_node_map *nodemap=NULL;
2271         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2272         uint32_t recmaster;
2273         uint32_t *nodes;
2274
2275         /* find the recovery master */
2276         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2277         if (ret != 0) {
2278                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2279                 talloc_free(tmp_ctx);
2280                 return;
2281         }
2282
2283         /* read the node flags from the recmaster */
2284         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2285         if (ret != 0) {
2286                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2287                 talloc_free(tmp_ctx);
2288                 return;
2289         }
2290         if (c->pnn >= nodemap->num) {
2291                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2292                 talloc_free(tmp_ctx);
2293                 return;
2294         }
2295
2296         /* send the flags update to all connected nodes */
2297         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2298
2299         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2300                                       nodes, 0, CONTROL_TIMEOUT(),
2301                                       false, data,
2302                                       NULL, NULL,
2303                                       NULL) != 0) {
2304                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2305
2306                 talloc_free(tmp_ctx);
2307                 return;
2308         }
2309
2310         talloc_free(tmp_ctx);
2311 }
2312
2313
2314 struct verify_recmode_normal_data {
2315         uint32_t count;
2316         enum monitor_result status;
2317 };
2318
2319 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2320 {
2321         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2322
2323
2324         /* one more node has responded with recmode data*/
2325         rmdata->count--;
2326
2327         /* if we failed to get the recmode, then return an error and let
2328            the main loop try again.
2329         */
2330         if (state->state != CTDB_CONTROL_DONE) {
2331                 if (rmdata->status == MONITOR_OK) {
2332                         rmdata->status = MONITOR_FAILED;
2333                 }
2334                 return;
2335         }
2336
2337         /* if we got a response, then the recmode will be stored in the
2338            status field
2339         */
2340         if (state->status != CTDB_RECOVERY_NORMAL) {
2341                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2342                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2343         }
2344
2345         return;
2346 }
2347
2348
2349 /* verify that all nodes are in normal recovery mode */
2350 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2351 {
2352         struct verify_recmode_normal_data *rmdata;
2353         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2354         struct ctdb_client_control_state *state;
2355         enum monitor_result status;
2356         int j;
2357         
2358         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2359         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2360         rmdata->count  = 0;
2361         rmdata->status = MONITOR_OK;
2362
2363         /* loop over all active nodes and send an async getrecmode call to 
2364            them*/
2365         for (j=0; j<nodemap->num; j++) {
2366                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2367                         continue;
2368                 }
2369                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2370                                         CONTROL_TIMEOUT(), 
2371                                         nodemap->nodes[j].pnn);
2372                 if (state == NULL) {
2373                         /* we failed to send the control, treat this as 
2374                            an error and try again next iteration
2375                         */                      
2376                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2377                         talloc_free(mem_ctx);
2378                         return MONITOR_FAILED;
2379                 }
2380
2381                 /* set up the callback functions */
2382                 state->async.fn = verify_recmode_normal_callback;
2383                 state->async.private_data = rmdata;
2384
2385                 /* one more control to wait for to complete */
2386                 rmdata->count++;
2387         }
2388
2389
2390         /* now wait for up to the maximum number of seconds allowed
2391            or until all nodes we expect a response from has replied
2392         */
2393         while (rmdata->count > 0) {
2394                 event_loop_once(ctdb->ev);
2395         }
2396
2397         status = rmdata->status;
2398         talloc_free(mem_ctx);
2399         return status;
2400 }
2401
2402
2403 struct verify_recmaster_data {
2404         struct ctdb_recoverd *rec;
2405         uint32_t count;
2406         uint32_t pnn;
2407         enum monitor_result status;
2408 };
2409
2410 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2411 {
2412         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2413
2414
2415         /* one more node has responded with recmaster data*/
2416         rmdata->count--;
2417
2418         /* if we failed to get the recmaster, then return an error and let
2419            the main loop try again.
2420         */
2421         if (state->state != CTDB_CONTROL_DONE) {
2422                 if (rmdata->status == MONITOR_OK) {
2423                         rmdata->status = MONITOR_FAILED;
2424                 }
2425                 return;
2426         }
2427
2428         /* if we got a response, then the recmaster will be stored in the
2429            status field
2430         */
2431         if (state->status != rmdata->pnn) {
2432                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2433                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2434                 rmdata->status = MONITOR_ELECTION_NEEDED;
2435         }
2436
2437         return;
2438 }
2439
2440
2441 /* verify that all nodes agree that we are the recmaster */
2442 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2443 {
2444         struct ctdb_context *ctdb = rec->ctdb;
2445         struct verify_recmaster_data *rmdata;
2446         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2447         struct ctdb_client_control_state *state;
2448         enum monitor_result status;
2449         int j;
2450         
2451         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2452         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2453         rmdata->rec    = rec;
2454         rmdata->count  = 0;
2455         rmdata->pnn    = pnn;
2456         rmdata->status = MONITOR_OK;
2457
2458         /* loop over all active nodes and send an async getrecmaster call to 
2459            them*/
2460         for (j=0; j<nodemap->num; j++) {
2461                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2462                         continue;
2463                 }
2464                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2465                                         CONTROL_TIMEOUT(),
2466                                         nodemap->nodes[j].pnn);
2467                 if (state == NULL) {
2468                         /* we failed to send the control, treat this as 
2469                            an error and try again next iteration
2470                         */                      
2471                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2472                         talloc_free(mem_ctx);
2473                         return MONITOR_FAILED;
2474                 }
2475
2476                 /* set up the callback functions */
2477                 state->async.fn = verify_recmaster_callback;
2478                 state->async.private_data = rmdata;
2479
2480                 /* one more control to wait for to complete */
2481                 rmdata->count++;
2482         }
2483
2484
2485         /* now wait for up to the maximum number of seconds allowed
2486            or until all nodes we expect a response from has replied
2487         */
2488         while (rmdata->count > 0) {
2489                 event_loop_once(ctdb->ev);
2490         }
2491
2492         status = rmdata->status;
2493         talloc_free(mem_ctx);
2494         return status;
2495 }
2496
2497
2498 /* called to check that the local allocation of public ip addresses is ok.
2499 */
2500 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2501 {
2502         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2503         struct ctdb_control_get_ifaces *ifaces = NULL;
2504         struct ctdb_all_public_ips *ips = NULL;
2505         struct ctdb_uptime *uptime1 = NULL;
2506         struct ctdb_uptime *uptime2 = NULL;
2507         int ret, j;
2508         bool need_iface_check = false;
2509         bool need_takeover_run = false;
2510
2511         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2512                                 CTDB_CURRENT_NODE, &uptime1);
2513         if (ret != 0) {
2514                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2515                 talloc_free(mem_ctx);
2516                 return -1;
2517         }
2518
2519
2520         /* read the interfaces from the local node */
2521         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2522         if (ret != 0) {
2523                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2524                 talloc_free(mem_ctx);
2525                 return -1;
2526         }
2527
2528         if (!rec->ifaces) {
2529                 need_iface_check = true;
2530         } else if (rec->ifaces->num != ifaces->num) {
2531                 need_iface_check = true;
2532         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2533                 need_iface_check = true;
2534         }
2535
2536         if (need_iface_check) {
2537                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2538                                      "local node %u - force takeover run\n",
2539                                      pnn));
2540                 need_takeover_run = true;
2541         }
2542
2543         /* read the ip allocation from the local node */
2544         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2545         if (ret != 0) {
2546                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2547                 talloc_free(mem_ctx);
2548                 return -1;
2549         }
2550
2551         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2552                                 CTDB_CURRENT_NODE, &uptime2);
2553         if (ret != 0) {
2554                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2555                 talloc_free(mem_ctx);
2556                 return -1;
2557         }
2558
2559         /* skip the check if the startrecovery time has changed */
2560         if (timeval_compare(&uptime1->last_recovery_started,
2561                             &uptime2->last_recovery_started) != 0) {
2562                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2563                 talloc_free(mem_ctx);
2564                 return 0;
2565         }
2566
2567         /* skip the check if the endrecovery time has changed */
2568         if (timeval_compare(&uptime1->last_recovery_finished,
2569                             &uptime2->last_recovery_finished) != 0) {
2570                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2571                 talloc_free(mem_ctx);
2572                 return 0;
2573         }
2574
2575         /* skip the check if we have started but not finished recovery */
2576         if (timeval_compare(&uptime1->last_recovery_finished,
2577                             &uptime1->last_recovery_started) != 1) {
2578                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2579                 talloc_free(mem_ctx);
2580
2581                 return 0;
2582         }
2583
2584         talloc_free(rec->ifaces);
2585         rec->ifaces = talloc_steal(rec, ifaces);
2586
2587         /* verify that we have the ip addresses we should have
2588            and we dont have ones we shouldnt have.
2589            if we find an inconsistency we set recmode to
2590            active on the local node and wait for the recmaster
2591            to do a full blown recovery.
2592            also if the pnn is -1 and we are healthy and can host the ip
2593            we also request a ip reallocation.
2594         */
2595         if (ctdb->tunable.disable_ip_failover == 0) {
2596                 for (j=0; j<ips->num; j++) {
2597                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2598                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2599                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2600                                 need_takeover_run = true;
2601                         } else if (ips->ips[j].pnn == pnn) {
2602                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2603                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2604                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2605                                         need_takeover_run = true;
2606                                 }
2607                         } else {
2608                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2609                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2610                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2611                                         need_takeover_run = true;
2612                                 }
2613                         }
2614                 }
2615         }
2616
2617         if (need_takeover_run) {
2618                 struct takeover_run_reply rd;
2619                 TDB_DATA data;
2620
2621                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2622
2623                 rd.pnn = ctdb->pnn;
2624                 rd.srvid = 0;
2625                 data.dptr = (uint8_t *)&rd;
2626                 data.dsize = sizeof(rd);
2627
2628                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2629                 if (ret != 0) {
2630                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2631                 }
2632         }
2633         talloc_free(mem_ctx);
2634         return 0;
2635 }
2636
2637
2638 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2639 {
2640         struct ctdb_node_map **remote_nodemaps = callback_data;
2641
2642         if (node_pnn >= ctdb->num_nodes) {
2643                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2644                 return;
2645         }
2646
2647         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2648
2649 }
2650
2651 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2652         struct ctdb_node_map *nodemap,
2653         struct ctdb_node_map **remote_nodemaps)
2654 {
2655         uint32_t *nodes;
2656
2657         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2658         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2659                                         nodes, 0,
2660                                         CONTROL_TIMEOUT(), false, tdb_null,
2661                                         async_getnodemap_callback,
2662                                         NULL,
2663                                         remote_nodemaps) != 0) {
2664                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2665
2666                 return -1;
2667         }
2668
2669         return 0;
2670 }
2671
2672 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2673 struct ctdb_check_reclock_state {
2674         struct ctdb_context *ctdb;
2675         struct timeval start_time;
2676         int fd[2];
2677         pid_t child;
2678         struct timed_event *te;
2679         struct fd_event *fde;
2680         enum reclock_child_status status;
2681 };
2682
2683 /* when we free the reclock state we must kill any child process.
2684 */
2685 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2686 {
2687         struct ctdb_context *ctdb = state->ctdb;
2688
2689         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2690
2691         if (state->fd[0] != -1) {
2692                 close(state->fd[0]);
2693                 state->fd[0] = -1;
2694         }
2695         if (state->fd[1] != -1) {
2696                 close(state->fd[1]);
2697                 state->fd[1] = -1;
2698         }
2699         kill(state->child, SIGKILL);
2700         return 0;
2701 }
2702
2703 /*
2704   called if our check_reclock child times out. this would happen if
2705   i/o to the reclock file blocks.
2706  */
2707 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2708                                          struct timeval t, void *private_data)
2709 {
2710         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2711                                            struct ctdb_check_reclock_state);
2712
2713         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2714         state->status = RECLOCK_TIMEOUT;
2715 }
2716
2717 /* this is called when the child process has completed checking the reclock
2718    file and has written data back to us through the pipe.
2719 */
2720 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2721                              uint16_t flags, void *private_data)
2722 {
2723         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2724                                              struct ctdb_check_reclock_state);
2725         char c = 0;
2726         int ret;
2727
2728         /* we got a response from our child process so we can abort the
2729            timeout.
2730         */
2731         talloc_free(state->te);
2732         state->te = NULL;
2733
2734         ret = read(state->fd[0], &c, 1);
2735         if (ret != 1 || c != RECLOCK_OK) {
2736                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2737                 state->status = RECLOCK_FAILED;
2738
2739                 return;
2740         }
2741
2742         state->status = RECLOCK_OK;
2743         return;
2744 }
2745
2746 static int check_recovery_lock(struct ctdb_context *ctdb)
2747 {
2748         int ret;
2749         struct ctdb_check_reclock_state *state;
2750         pid_t parent = getpid();
2751
2752         if (ctdb->recovery_lock_fd == -1) {
2753                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2754                 return -1;
2755         }
2756
2757         state = talloc(ctdb, struct ctdb_check_reclock_state);
2758         CTDB_NO_MEMORY(ctdb, state);
2759
2760         state->ctdb = ctdb;
2761         state->start_time = timeval_current();
2762         state->status = RECLOCK_CHECKING;
2763         state->fd[0] = -1;
2764         state->fd[1] = -1;
2765
2766         ret = pipe(state->fd);
2767         if (ret != 0) {
2768                 talloc_free(state);
2769                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2770                 return -1;
2771         }
2772
2773         state->child = ctdb_fork(ctdb);
2774         if (state->child == (pid_t)-1) {
2775                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2776                 close(state->fd[0]);
2777                 state->fd[0] = -1;
2778                 close(state->fd[1]);
2779                 state->fd[1] = -1;
2780                 talloc_free(state);
2781                 return -1;
2782         }
2783
2784         if (state->child == 0) {
2785                 char cc = RECLOCK_OK;
2786                 close(state->fd[0]);
2787                 state->fd[0] = -1;
2788
2789                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2790                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2791                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2792                         cc = RECLOCK_FAILED;
2793                 }
2794
2795                 write(state->fd[1], &cc, 1);
2796                 /* make sure we die when our parent dies */
2797                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2798                         sleep(5);
2799                         write(state->fd[1], &cc, 1);
2800                 }
2801                 _exit(0);
2802         }
2803         close(state->fd[1]);
2804         state->fd[1] = -1;
2805         set_close_on_exec(state->fd[0]);
2806
2807         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2808
2809         talloc_set_destructor(state, check_reclock_destructor);
2810
2811         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2812                                     ctdb_check_reclock_timeout, state);
2813         if (state->te == NULL) {
2814                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2815                 talloc_free(state);
2816                 return -1;
2817         }
2818
2819         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2820                                 EVENT_FD_READ,
2821                                 reclock_child_handler,
2822                                 (void *)state);
2823
2824         if (state->fde == NULL) {
2825                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2826                 talloc_free(state);
2827                 return -1;
2828         }
2829         tevent_fd_set_auto_close(state->fde);
2830
2831         while (state->status == RECLOCK_CHECKING) {
2832                 event_loop_once(ctdb->ev);
2833         }
2834
2835         if (state->status == RECLOCK_FAILED) {
2836                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2837                 close(ctdb->recovery_lock_fd);
2838                 ctdb->recovery_lock_fd = -1;
2839                 talloc_free(state);
2840                 return -1;
2841         }
2842
2843         talloc_free(state);
2844         return 0;
2845 }
2846
2847 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2848 {
2849         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2850         const char *reclockfile;
2851
2852         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2853                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2854                 talloc_free(tmp_ctx);
2855                 return -1;      
2856         }
2857
2858         if (reclockfile == NULL) {
2859                 if (ctdb->recovery_lock_file != NULL) {
2860                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2861                         talloc_free(ctdb->recovery_lock_file);
2862                         ctdb->recovery_lock_file = NULL;
2863                         if (ctdb->recovery_lock_fd != -1) {
2864                                 close(ctdb->recovery_lock_fd);
2865                                 ctdb->recovery_lock_fd = -1;
2866                         }
2867                 }
2868                 ctdb->tunable.verify_recovery_lock = 0;
2869                 talloc_free(tmp_ctx);
2870                 return 0;
2871         }
2872
2873         if (ctdb->recovery_lock_file == NULL) {
2874                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2875                 if (ctdb->recovery_lock_fd != -1) {
2876                         close(ctdb->recovery_lock_fd);
2877                         ctdb->recovery_lock_fd = -1;
2878                 }
2879                 talloc_free(tmp_ctx);
2880                 return 0;
2881         }
2882
2883
2884         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2885                 talloc_free(tmp_ctx);
2886                 return 0;
2887         }
2888
2889         talloc_free(ctdb->recovery_lock_file);
2890         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2891         ctdb->tunable.verify_recovery_lock = 0;
2892         if (ctdb->recovery_lock_fd != -1) {
2893                 close(ctdb->recovery_lock_fd);
2894                 ctdb->recovery_lock_fd = -1;
2895         }
2896
2897         talloc_free(tmp_ctx);
2898         return 0;
2899 }
2900
2901 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2902                       TALLOC_CTX *mem_ctx)
2903 {
2904         uint32_t pnn;
2905         struct ctdb_node_map *nodemap=NULL;
2906         struct ctdb_node_map *recmaster_nodemap=NULL;
2907         struct ctdb_node_map **remote_nodemaps=NULL;
2908         struct ctdb_vnn_map *vnnmap=NULL;
2909         struct ctdb_vnn_map *remote_vnnmap=NULL;
2910         int32_t debug_level;
2911         int i, j, ret;
2912
2913
2914
2915         /* verify that the main daemon is still running */
2916         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2917                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2918                 exit(-1);
2919         }
2920
2921         /* ping the local daemon to tell it we are alive */
2922         ctdb_ctrl_recd_ping(ctdb);
2923
2924         if (rec->election_timeout) {
2925                 /* an election is in progress */
2926                 return;
2927         }
2928
2929         /* read the debug level from the parent and update locally */
2930         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2931         if (ret !=0) {
2932                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2933                 return;
2934         }
2935         LogLevel = debug_level;
2936
2937
2938         /* We must check if we need to ban a node here but we want to do this
2939            as early as possible so we dont wait until we have pulled the node
2940            map from the local node. thats why we have the hardcoded value 20
2941         */
2942         for (i=0; i<ctdb->num_nodes; i++) {
2943                 struct ctdb_banning_state *ban_state;
2944
2945                 if (ctdb->nodes[i]->ban_state == NULL) {
2946                         continue;
2947                 }
2948                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2949                 if (ban_state->count < 20) {
2950                         continue;
2951                 }
2952                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2953                         ctdb->nodes[i]->pnn, ban_state->count,
2954                         ctdb->tunable.recovery_ban_period));
2955                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2956                 ban_state->count = 0;
2957         }
2958
2959         /* get relevant tunables */
2960         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2961         if (ret != 0) {
2962                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2963                 return;
2964         }
2965
2966         /* get the current recovery lock file from the server */
2967         if (update_recovery_lock_file(ctdb) != 0) {
2968                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2969                 return;
2970         }
2971
2972         /* Make sure that if recovery lock verification becomes disabled when
2973            we close the file
2974         */
2975         if (ctdb->tunable.verify_recovery_lock == 0) {
2976                 if (ctdb->recovery_lock_fd != -1) {
2977                         close(ctdb->recovery_lock_fd);
2978                         ctdb->recovery_lock_fd = -1;
2979                 }
2980         }
2981
2982         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2983         if (pnn == (uint32_t)-1) {
2984                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2985                 return;
2986         }
2987
2988         /* get the vnnmap */
2989         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2990         if (ret != 0) {
2991                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2992                 return;
2993         }
2994
2995
2996         /* get number of nodes */
2997         if (rec->nodemap) {
2998                 talloc_free(rec->nodemap);
2999                 rec->nodemap = NULL;
3000                 nodemap=NULL;
3001         }
3002         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3003         if (ret != 0) {
3004                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3005                 return;
3006         }
3007         nodemap = rec->nodemap;
3008
3009         /* check which node is the recovery master */
3010         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3011         if (ret != 0) {
3012                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3013                 return;
3014         }
3015
3016         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3017         if (rec->recmaster != pnn) {
3018                 if (rec->ip_reallocate_ctx != NULL) {
3019                         talloc_free(rec->ip_reallocate_ctx);
3020                         rec->ip_reallocate_ctx = NULL;
3021                         rec->reallocate_callers = NULL;
3022                 }
3023         }
3024         /* if there are takeovers requested, perform it and notify the waiters */
3025         if (rec->reallocate_callers) {
3026                 process_ipreallocate_requests(ctdb, rec);
3027         }
3028
3029         if (rec->recmaster == (uint32_t)-1) {
3030                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3031                 force_election(rec, pnn, nodemap);
3032                 return;
3033         }
3034
3035
3036         /* if the local daemon is STOPPED, we verify that the databases are
3037            also frozen and thet the recmode is set to active 
3038         */
3039         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3040                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3041                 if (ret != 0) {
3042                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3043                 }
3044                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3045                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3046
3047                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3048                         if (ret != 0) {
3049                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3050                                 return;
3051                         }
3052                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3053                         if (ret != 0) {
3054                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3055
3056                                 return;
3057                         }
3058                         return;
3059                 }
3060         }
3061         /* If the local node is stopped, verify we are not the recmaster 
3062            and yield this role if so
3063         */
3064         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3065                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3066                 force_election(rec, pnn, nodemap);
3067                 return;
3068         }
3069         
3070         /* check that we (recovery daemon) and the local ctdb daemon
3071            agrees on whether we are banned or not
3072         */
3073 //qqq
3074
3075         /* remember our own node flags */
3076         rec->node_flags = nodemap->nodes[pnn].flags;
3077
3078         /* count how many active nodes there are */
3079         rec->num_active    = 0;
3080         rec->num_connected = 0;
3081         for (i=0; i<nodemap->num; i++) {
3082                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3083                         rec->num_active++;
3084                 }
3085                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3086                         rec->num_connected++;
3087                 }
3088         }
3089
3090
3091         /* verify that the recmaster node is still active */
3092         for (j=0; j<nodemap->num; j++) {
3093                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3094                         break;
3095                 }
3096         }
3097
3098         if (j == nodemap->num) {
3099                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3100                 force_election(rec, pnn, nodemap);
3101                 return;
3102         }
3103
3104         /* if recovery master is disconnected we must elect a new recmaster */
3105         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3106                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3107                 force_election(rec, pnn, nodemap);
3108                 return;
3109         }
3110
3111         /* grap the nodemap from the recovery master to check if it is banned */
3112         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3113                                    mem_ctx, &recmaster_nodemap);
3114         if (ret != 0) {
3115                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3116                           nodemap->nodes[j].pnn));
3117                 return;
3118         }
3119
3120
3121         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3122                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3123                 force_election(rec, pnn, nodemap);
3124                 return;
3125         }
3126
3127
3128         /* verify that we have all ip addresses we should have and we dont
3129          * have addresses we shouldnt have.
3130          */ 
3131         if (ctdb->tunable.disable_ip_failover == 0) {
3132                 if (rec->ip_check_disable_ctx == NULL) {
3133                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3134                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3135                         }
3136                 }
3137         }
3138
3139
3140         /* if we are not the recmaster then we do not need to check
3141            if recovery is needed
3142          */
3143         if (pnn != rec->recmaster) {
3144                 return;
3145         }
3146
3147
3148         /* ensure our local copies of flags are right */
3149         ret = update_local_flags(rec, nodemap);
3150         if (ret == MONITOR_ELECTION_NEEDED) {
3151                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3152                 force_election(rec, pnn, nodemap);
3153                 return;
3154         }
3155         if (ret != MONITOR_OK) {
3156                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3157                 return;
3158         }
3159
3160         if (ctdb->num_nodes != nodemap->num) {
3161                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3162                 reload_nodes_file(ctdb);
3163                 return;
3164         }
3165
3166         /* verify that all active nodes agree that we are the recmaster */
3167         switch (verify_recmaster(rec, nodemap, pnn)) {
3168         case MONITOR_RECOVERY_NEEDED:
3169                 /* can not happen */
3170                 return;
3171         case MONITOR_ELECTION_NEEDED:
3172                 force_election(rec, pnn, nodemap);
3173                 return;
3174         case MONITOR_OK:
3175                 break;
3176         case MONITOR_FAILED:
3177                 return;
3178         }
3179
3180
3181         if (rec->need_recovery) {
3182                 /* a previous recovery didn't finish */
3183                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3184                 return;
3185         }
3186
3187         /* verify that all active nodes are in normal mode 
3188            and not in recovery mode 
3189         */
3190         switch (verify_recmode(ctdb, nodemap)) {
3191         case MONITOR_RECOVERY_NEEDED:
3192                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3193                 return;
3194         case MONITOR_FAILED:
3195                 return;
3196         case MONITOR_ELECTION_NEEDED:
3197                 /* can not happen */
3198         case MONITOR_OK:
3199                 break;
3200         }
3201
3202
3203         if (ctdb->tunable.verify_recovery_lock != 0) {
3204                 /* we should have the reclock - check its not stale */
3205                 ret = check_recovery_lock(ctdb);
3206                 if (ret != 0) {
3207                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3208                         ctdb_set_culprit(rec, ctdb->pnn);
3209                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3210                         return;
3211                 }
3212         }
3213
3214         /* get the nodemap for all active remote nodes
3215          */
3216         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3217         if (remote_nodemaps == NULL) {
3218                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3219                 return;
3220         }
3221         for(i=0; i<nodemap->num; i++) {
3222                 remote_nodemaps[i] = NULL;
3223         }
3224         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3225                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3226                 return;
3227         } 
3228
3229         /* verify that all other nodes have the same nodemap as we have
3230         */
3231         for (j=0; j<nodemap->num; j++) {
3232                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3233                         continue;
3234                 }
3235
3236                 if (remote_nodemaps[j] == NULL) {
3237                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3238                         ctdb_set_culprit(rec, j);
3239
3240                         return;
3241                 }
3242
3243                 /* if the nodes disagree on how many nodes there are
3244                    then this is a good reason to try recovery
3245                  */
3246                 if (remote_nodemaps[j]->num != nodemap->num) {
3247                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3248                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3249                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3250                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3251                         return;
3252                 }
3253
3254                 /* if the nodes disagree on which nodes exist and are
3255                    active, then that is also a good reason to do recovery
3256                  */
3257                 for (i=0;i<nodemap->num;i++) {
3258                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3259                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3260                                           nodemap->nodes[j].pnn, i, 
3261                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3262                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3263                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3264                                             vnnmap);
3265                                 return;
3266                         }
3267                 }
3268
3269                 /* verify the flags are consistent
3270                 */
3271                 for (i=0; i<nodemap->num; i++) {
3272                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3273                                 continue;
3274                         }
3275                         
3276                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3277                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3278                                   nodemap->nodes[j].pnn, 
3279                                   nodemap->nodes[i].pnn, 
3280                                   remote_nodemaps[j]->nodes[i].flags,
3281                                   nodemap->nodes[j].flags));
3282                                 if (i == j) {
3283                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3284                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3285                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3286                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3287                                                     vnnmap);
3288                                         return;
3289                                 } else {
3290                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3291                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3292                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3293                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3294                                                     vnnmap);
3295                                         return;
3296                                 }
3297                         }
3298                 }
3299         }
3300
3301
3302         /* there better be the same number of lmasters in the vnn map
3303            as there are active nodes or we will have to do a recovery
3304          */
3305         if (vnnmap->size != rec->num_active) {
3306                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3307                           vnnmap->size, rec->num_active));
3308                 ctdb_set_culprit(rec, ctdb->pnn);
3309                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3310                 return;
3311         }
3312
3313         /* verify that all active nodes in the nodemap also exist in 
3314            the vnnmap.
3315          */
3316         for (j=0; j<nodemap->num; j++) {
3317                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3318                         continue;
3319                 }
3320                 if (nodemap->nodes[j].pnn == pnn) {
3321                         continue;
3322                 }
3323
3324                 for (i=0; i<vnnmap->size; i++) {
3325                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3326                                 break;
3327                         }
3328                 }
3329                 if (i == vnnmap->size) {
3330                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3331                                   nodemap->nodes[j].pnn));
3332                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3333                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3334                         return;
3335                 }
3336         }
3337
3338         
3339         /* verify that all other nodes have the same vnnmap
3340            and are from the same generation
3341          */
3342         for (j=0; j<nodemap->num; j++) {
3343                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3344                         continue;
3345                 }
3346                 if (nodemap->nodes[j].pnn == pnn) {
3347                         continue;
3348                 }
3349
3350                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3351                                           mem_ctx, &remote_vnnmap);
3352                 if (ret != 0) {
3353                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3354                                   nodemap->nodes[j].pnn));
3355                         return;
3356                 }
3357
3358                 /* verify the vnnmap generation is the same */
3359                 if (vnnmap->generation != remote_vnnmap->generation) {
3360                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3361                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3362                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3363                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3364                         return;
3365                 }
3366
3367                 /* verify the vnnmap size is the same */
3368                 if (vnnmap->size != remote_vnnmap->size) {
3369                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3370                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3371                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3372                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3373                         return;
3374                 }
3375
3376                 /* verify the vnnmap is the same */
3377                 for (i=0;i<vnnmap->size;i++) {
3378                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3379                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3380                                           nodemap->nodes[j].pnn));
3381                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3382                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3383                                             vnnmap);
3384                                 return;
3385                         }
3386                 }
3387         }
3388
3389         /* we might need to change who has what IP assigned */
3390         if (rec->need_takeover_run) {
3391                 uint32_t culprit = (uint32_t)-1;
3392
3393                 rec->need_takeover_run = false;
3394
3395                 /* update the list of public ips that a node can handle for
3396                    all connected nodes
3397                 */
3398                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3399                 if (ret != 0) {
3400                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3401                                          culprit));
3402                         rec->need_takeover_run = true;
3403                         return;
3404                 }
3405
3406                 /* execute the "startrecovery" event script on all nodes */
3407                 ret = run_startrecovery_eventscript(rec, nodemap);
3408                 if (ret!=0) {
3409                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3410                         ctdb_set_culprit(rec, ctdb->pnn);
3411                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3412                         return;
3413                 }
3414
3415                 ret = ctdb_takeover_run(ctdb, nodemap);
3416                 if (ret != 0) {
3417                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3418                         return;
3419                 }
3420
3421                 /* execute the "recovered" event script on all nodes */
3422                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3423 #if 0
3424 // we cant check whether the event completed successfully
3425 // since this script WILL fail if the node is in recovery mode
3426 // and if that race happens, the code here would just cause a second
3427 // cascading recovery.
3428                 if (ret!=0) {
3429                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3430                         ctdb_set_culprit(rec, ctdb->pnn);
3431                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3432                 }
3433 #endif
3434         }
3435 }
3436
3437 /*
3438   the main monitoring loop
3439  */
3440 static void monitor_cluster(struct ctdb_context *ctdb)
3441 {
3442         struct ctdb_recoverd *rec;
3443
3444         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3445
3446         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3447         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3448
3449         rec->ctdb = ctdb;
3450
3451         rec->priority_time = timeval_current();
3452
3453         /* register a message port for sending memory dumps */
3454         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3455
3456         /* register a message port for recovery elections */
3457         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3458
3459         /* when nodes are disabled/enabled */
3460         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3461
3462         /* when we are asked to puch out a flag change */
3463         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3464
3465         /* register a message port for vacuum fetch */
3466         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3467
3468         /* register a message port for reloadnodes  */
3469         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3470
3471         /* register a message port for performing a takeover run */
3472         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3473
3474         /* register a message port for disabling the ip check for a short while */
3475         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3476
3477         /* register a message port for updating the recovery daemons node assignment for an ip */
3478         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3479
3480         for (;;) {
3481                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3482                 struct timeval start;
3483                 double elapsed;
3484
3485                 if (!mem_ctx) {
3486                         DEBUG(DEBUG_CRIT,(__location__
3487                                           " Failed to create temp context\n"));
3488                         exit(-1);
3489                 }
3490
3491                 start = timeval_current();
3492                 main_loop(ctdb, rec, mem_ctx);
3493                 talloc_free(mem_ctx);
3494
3495                 /* we only check for recovery once every second */
3496                 elapsed = timeval_elapsed(&start);
3497                 if (elapsed < ctdb->tunable.recover_interval) {
3498                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3499                                           - elapsed);
3500                 }
3501         }
3502 }
3503
3504 /*
3505   event handler for when the main ctdbd dies
3506  */
3507 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3508                                  uint16_t flags, void *private_data)
3509 {
3510         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3511         _exit(1);
3512 }
3513
3514 /*
3515   called regularly to verify that the recovery daemon is still running
3516  */
3517 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3518                               struct timeval yt, void *p)
3519 {
3520         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3521
3522         if (kill(ctdb->recoverd_pid, 0) != 0) {
3523                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3524
3525                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3526                                 ctdb_restart_recd, ctdb);
3527
3528                 return;
3529         }
3530
3531         event_add_timed(ctdb->ev, ctdb, 
3532                         timeval_current_ofs(30, 0),
3533                         ctdb_check_recd, ctdb);
3534 }
3535
3536 static void recd_sig_child_handler(struct event_context *ev,
3537         struct signal_event *se, int signum, int count,
3538         void *dont_care, 
3539         void *private_data)
3540 {
3541 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3542         int status;
3543         pid_t pid = -1;
3544
3545         while (pid != 0) {
3546                 pid = waitpid(-1, &status, WNOHANG);
3547                 if (pid == -1) {
3548                         if (errno != ECHILD) {
3549                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3550                         }
3551                         return;
3552                 }
3553                 if (pid > 0) {
3554                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3555                 }
3556         }
3557 }
3558
3559 /*
3560   startup the recovery daemon as a child of the main ctdb daemon
3561  */
3562 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3563 {
3564         int fd[2];
3565         struct signal_event *se;
3566         struct tevent_fd *fde;
3567
3568         if (pipe(fd) != 0) {
3569                 return -1;
3570         }
3571
3572         ctdb->ctdbd_pid = getpid();
3573
3574         ctdb->recoverd_pid = fork();
3575         if (ctdb->recoverd_pid == -1) {
3576                 return -1;
3577         }
3578         
3579         if (ctdb->recoverd_pid != 0) {
3580                 close(fd[0]);
3581                 event_add_timed(ctdb->ev, ctdb, 
3582                                 timeval_current_ofs(30, 0),
3583                                 ctdb_check_recd, ctdb);
3584                 return 0;
3585         }
3586
3587         close(fd[1]);
3588
3589         srandom(getpid() ^ time(NULL));
3590
3591         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3592                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3593                 exit(1);
3594         }
3595
3596         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3597
3598         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3599                      ctdb_recoverd_parent, &fd[0]);     
3600         tevent_fd_set_auto_close(fde);
3601
3602         /* set up a handler to pick up sigchld */
3603         se = event_add_signal(ctdb->ev, ctdb,
3604                                      SIGCHLD, 0,
3605                                      recd_sig_child_handler,
3606                                      ctdb);
3607         if (se == NULL) {
3608                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3609                 exit(1);
3610         }
3611
3612         monitor_cluster(ctdb);
3613
3614         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3615         return -1;
3616 }
3617
3618 /*
3619   shutdown the recovery daemon
3620  */
3621 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3622 {
3623         if (ctdb->recoverd_pid == 0) {
3624                 return;
3625         }
3626
3627         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3628         kill(ctdb->recoverd_pid, SIGTERM);
3629 }
3630
3631 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3632                        struct timeval t, void *private_data)
3633 {
3634         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3635
3636         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3637         ctdb_stop_recoverd(ctdb);
3638         ctdb_start_recoverd(ctdb);
3639 }