vacuum: check lmaster against num_nodes instead of vnn_map->size
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222 }
223
224 /*
225   update the node capabilities for all connected nodes
226  */
227 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
228 {
229         uint32_t *nodes;
230         TALLOC_CTX *tmp_ctx;
231
232         tmp_ctx = talloc_new(ctdb);
233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
234
235         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
236         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
237                                         nodes, 0,
238                                         CONTROL_TIMEOUT(),
239                                         false, tdb_null,
240                                         async_getcap_callback, NULL,
241                                         NULL) != 0) {
242                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
243                 talloc_free(tmp_ctx);
244                 return -1;
245         }
246
247         talloc_free(tmp_ctx);
248         return 0;
249 }
250
251 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
256         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
257 }
258
259 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
260 {
261         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
262
263         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
264         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
265 }
266
267 /*
268   change recovery mode on all nodes
269  */
270 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
271 {
272         TDB_DATA data;
273         uint32_t *nodes;
274         TALLOC_CTX *tmp_ctx;
275
276         tmp_ctx = talloc_new(ctdb);
277         CTDB_NO_MEMORY(ctdb, tmp_ctx);
278
279         /* freeze all nodes */
280         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
281         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
282                 int i;
283
284                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
285                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
286                                                 nodes, i,
287                                                 CONTROL_TIMEOUT(),
288                                                 false, tdb_null,
289                                                 NULL,
290                                                 set_recmode_fail_callback,
291                                                 rec) != 0) {
292                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
293                                 talloc_free(tmp_ctx);
294                                 return -1;
295                         }
296                 }
297         }
298
299
300         data.dsize = sizeof(uint32_t);
301         data.dptr = (unsigned char *)&rec_mode;
302
303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
304                                         nodes, 0,
305                                         CONTROL_TIMEOUT(),
306                                         false, data,
307                                         NULL, NULL,
308                                         NULL) != 0) {
309                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
310                 talloc_free(tmp_ctx);
311                 return -1;
312         }
313
314         talloc_free(tmp_ctx);
315         return 0;
316 }
317
318 /*
319   change recovery master on all node
320  */
321 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
322 {
323         TDB_DATA data;
324         TALLOC_CTX *tmp_ctx;
325         uint32_t *nodes;
326
327         tmp_ctx = talloc_new(ctdb);
328         CTDB_NO_MEMORY(ctdb, tmp_ctx);
329
330         data.dsize = sizeof(uint32_t);
331         data.dptr = (unsigned char *)&pnn;
332
333         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
335                                         nodes, 0,
336                                         CONTROL_TIMEOUT(), false, data,
337                                         NULL, NULL,
338                                         NULL) != 0) {
339                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
340                 talloc_free(tmp_ctx);
341                 return -1;
342         }
343
344         talloc_free(tmp_ctx);
345         return 0;
346 }
347
348 /* update all remote nodes to use the same db priority that we have
349    this can fail if the remove node has not yet been upgraded to 
350    support this function, so we always return success and never fail
351    a recovery if this call fails.
352 */
353 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
354         struct ctdb_node_map *nodemap, 
355         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int db;
358         uint32_t *nodes;
359
360         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
361
362         /* step through all local databases */
363         for (db=0; db<dbmap->num;db++) {
364                 TDB_DATA data;
365                 struct ctdb_db_priority db_prio;
366                 int ret;
367
368                 db_prio.db_id     = dbmap->dbs[db].dbid;
369                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
372                         continue;
373                 }
374
375                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
376
377                 data.dptr  = (uint8_t *)&db_prio;
378                 data.dsize = sizeof(db_prio);
379
380                 if (ctdb_client_async_control(ctdb,
381                                         CTDB_CONTROL_SET_DB_PRIORITY,
382                                         nodes, 0,
383                                         CONTROL_TIMEOUT(), false, data,
384                                         NULL, NULL,
385                                         NULL) != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
387                 }
388         }
389
390         return 0;
391 }                       
392
393 /*
394   ensure all other nodes have attached to any databases that we have
395  */
396 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
397                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
398 {
399         int i, j, db, ret;
400         struct ctdb_dbid_map *remote_dbmap;
401
402         /* verify that all other nodes have all our databases */
403         for (j=0; j<nodemap->num; j++) {
404                 /* we dont need to ourself ourselves */
405                 if (nodemap->nodes[j].pnn == pnn) {
406                         continue;
407                 }
408                 /* dont check nodes that are unavailable */
409                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
410                         continue;
411                 }
412
413                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
414                                          mem_ctx, &remote_dbmap);
415                 if (ret != 0) {
416                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
417                         return -1;
418                 }
419
420                 /* step through all local databases */
421                 for (db=0; db<dbmap->num;db++) {
422                         const char *name;
423
424
425                         for (i=0;i<remote_dbmap->num;i++) {
426                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
427                                         break;
428                                 }
429                         }
430                         /* the remote node already have this database */
431                         if (i!=remote_dbmap->num) {
432                                 continue;
433                         }
434                         /* ok so we need to create this database */
435                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
436                                             mem_ctx, &name);
437                         if (ret != 0) {
438                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
439                                 return -1;
440                         }
441                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                            mem_ctx, name, dbmap->dbs[db].persistent);
443                         if (ret != 0) {
444                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
445                                 return -1;
446                         }
447                 }
448         }
449
450         return 0;
451 }
452
453
454 /*
455   ensure we are attached to any databases that anyone else is attached to
456  */
457 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
458                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int i, j, db, ret;
461         struct ctdb_dbid_map *remote_dbmap;
462
463         /* verify that we have all database any other node has */
464         for (j=0; j<nodemap->num; j++) {
465                 /* we dont need to ourself ourselves */
466                 if (nodemap->nodes[j].pnn == pnn) {
467                         continue;
468                 }
469                 /* dont check nodes that are unavailable */
470                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
471                         continue;
472                 }
473
474                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
475                                          mem_ctx, &remote_dbmap);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
478                         return -1;
479                 }
480
481                 /* step through all databases on the remote node */
482                 for (db=0; db<remote_dbmap->num;db++) {
483                         const char *name;
484
485                         for (i=0;i<(*dbmap)->num;i++) {
486                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
487                                         break;
488                                 }
489                         }
490                         /* we already have this db locally */
491                         if (i!=(*dbmap)->num) {
492                                 continue;
493                         }
494                         /* ok so we need to create this database and
495                            rebuild dbmap
496                          */
497                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
499                         if (ret != 0) {
500                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
501                                           nodemap->nodes[j].pnn));
502                                 return -1;
503                         }
504                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
505                                            remote_dbmap->dbs[db].persistent);
506                         if (ret != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
508                                 return -1;
509                         }
510                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
511                         if (ret != 0) {
512                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
513                                 return -1;
514                         }
515                 }
516         }
517
518         return 0;
519 }
520
521
522 /*
523   pull the remote database contents from one node into the recdb
524  */
525 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
526                                     struct tdb_wrap *recdb, uint32_t dbid,
527                                     bool persistent)
528 {
529         int ret;
530         TDB_DATA outdata;
531         struct ctdb_marshall_buffer *reply;
532         struct ctdb_rec_data *rec;
533         int i;
534         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
535
536         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
537                                CONTROL_TIMEOUT(), &outdata);
538         if (ret != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
540                 talloc_free(tmp_ctx);
541                 return -1;
542         }
543
544         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
545
546         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
547                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
548                 talloc_free(tmp_ctx);
549                 return -1;
550         }
551         
552         rec = (struct ctdb_rec_data *)&reply->data[0];
553         
554         for (i=0;
555              i<reply->count;
556              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
557                 TDB_DATA key, data;
558                 struct ctdb_ltdb_header *hdr;
559                 TDB_DATA existing;
560                 
561                 key.dptr = &rec->data[0];
562                 key.dsize = rec->keylen;
563                 data.dptr = &rec->data[key.dsize];
564                 data.dsize = rec->datalen;
565                 
566                 hdr = (struct ctdb_ltdb_header *)data.dptr;
567
568                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
569                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
570                         talloc_free(tmp_ctx);
571                         return -1;
572                 }
573
574                 /* fetch the existing record, if any */
575                 existing = tdb_fetch(recdb->tdb, key);
576                 
577                 if (existing.dptr != NULL) {
578                         struct ctdb_ltdb_header header;
579                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
580                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
581                                          (unsigned)existing.dsize, srcnode));
582                                 free(existing.dptr);
583                                 talloc_free(tmp_ctx);
584                                 return -1;
585                         }
586                         header = *(struct ctdb_ltdb_header *)existing.dptr;
587                         free(existing.dptr);
588                         if (!(header.rsn < hdr->rsn ||
589                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
590                                 continue;
591                         }
592                 }
593                 
594                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
595                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
596                         talloc_free(tmp_ctx);
597                         return -1;                              
598                 }
599         }
600
601         talloc_free(tmp_ctx);
602
603         return 0;
604 }
605
606 /*
607   pull all the remote database contents into the recdb
608  */
609 static int pull_remote_database(struct ctdb_context *ctdb,
610                                 struct ctdb_recoverd *rec, 
611                                 struct ctdb_node_map *nodemap, 
612                                 struct tdb_wrap *recdb, uint32_t dbid,
613                                 bool persistent)
614 {
615         int j;
616
617         /* pull all records from all other nodes across onto this node
618            (this merges based on rsn)
619         */
620         for (j=0; j<nodemap->num; j++) {
621                 /* dont merge from nodes that are unavailable */
622                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
623                         continue;
624                 }
625                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
626                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
627                                  nodemap->nodes[j].pnn));
628                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
629                         return -1;
630                 }
631         }
632         
633         return 0;
634 }
635
636
637 /*
638   update flags on all active nodes
639  */
640 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
641 {
642         int ret;
643
644         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
645                 if (ret != 0) {
646                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
647                 return -1;
648         }
649
650         return 0;
651 }
652
653 /*
654   ensure all nodes have the same vnnmap we do
655  */
656 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
657                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
658 {
659         int j, ret;
660
661         /* push the new vnn map out to all the nodes */
662         for (j=0; j<nodemap->num; j++) {
663                 /* dont push to nodes that are unavailable */
664                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
665                         continue;
666                 }
667
668                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
669                 if (ret != 0) {
670                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
671                         return -1;
672                 }
673         }
674
675         return 0;
676 }
677
678
679 struct vacuum_info {
680         struct vacuum_info *next, *prev;
681         struct ctdb_recoverd *rec;
682         uint32_t srcnode;
683         struct ctdb_db_context *ctdb_db;
684         struct ctdb_marshall_buffer *recs;
685         struct ctdb_rec_data *r;
686 };
687
688 static void vacuum_fetch_next(struct vacuum_info *v);
689
690 /*
691   called when a vacuum fetch has completed - just free it and do the next one
692  */
693 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
694 {
695         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
696         talloc_free(state);
697         vacuum_fetch_next(v);
698 }
699
700
701 /*
702   process the next element from the vacuum list
703 */
704 static void vacuum_fetch_next(struct vacuum_info *v)
705 {
706         struct ctdb_call call;
707         struct ctdb_rec_data *r;
708
709         while (v->recs->count) {
710                 struct ctdb_client_call_state *state;
711                 TDB_DATA data;
712                 struct ctdb_ltdb_header *hdr;
713
714                 ZERO_STRUCT(call);
715                 call.call_id = CTDB_NULL_FUNC;
716                 call.flags = CTDB_IMMEDIATE_MIGRATION;
717
718                 r = v->r;
719                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
720                 v->recs->count--;
721
722                 call.key.dptr = &r->data[0];
723                 call.key.dsize = r->keylen;
724
725                 /* ensure we don't block this daemon - just skip a record if we can't get
726                    the chainlock */
727                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
728                         continue;
729                 }
730
731                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
732                 if (data.dptr == NULL) {
733                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
734                         continue;
735                 }
736
737                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
738                         free(data.dptr);
739                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
740                         continue;
741                 }
742                 
743                 hdr = (struct ctdb_ltdb_header *)data.dptr;
744                 if (hdr->dmaster == v->rec->ctdb->pnn) {
745                         /* its already local */
746                         free(data.dptr);
747                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
748                         continue;
749                 }
750
751                 free(data.dptr);
752
753                 state = ctdb_call_send(v->ctdb_db, &call);
754                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
755                 if (state == NULL) {
756                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
757                         talloc_free(v);
758                         return;
759                 }
760                 state->async.fn = vacuum_fetch_callback;
761                 state->async.private_data = v;
762                 return;
763         }
764
765         talloc_free(v);
766 }
767
768
769 /*
770   destroy a vacuum info structure
771  */
772 static int vacuum_info_destructor(struct vacuum_info *v)
773 {
774         DLIST_REMOVE(v->rec->vacuum_info, v);
775         return 0;
776 }
777
778
779 /*
780   handler for vacuum fetch
781 */
782 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
783                                  TDB_DATA data, void *private_data)
784 {
785         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
786         struct ctdb_marshall_buffer *recs;
787         int ret, i;
788         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
789         const char *name;
790         struct ctdb_dbid_map *dbmap=NULL;
791         bool persistent = false;
792         struct ctdb_db_context *ctdb_db;
793         struct ctdb_rec_data *r;
794         uint32_t srcnode;
795         struct vacuum_info *v;
796
797         recs = (struct ctdb_marshall_buffer *)data.dptr;
798         r = (struct ctdb_rec_data *)&recs->data[0];
799
800         if (recs->count == 0) {
801                 talloc_free(tmp_ctx);
802                 return;
803         }
804
805         srcnode = r->reqid;
806
807         for (v=rec->vacuum_info;v;v=v->next) {
808                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
809                         /* we're already working on records from this node */
810                         talloc_free(tmp_ctx);
811                         return;
812                 }
813         }
814
815         /* work out if the database is persistent */
816         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
817         if (ret != 0) {
818                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
819                 talloc_free(tmp_ctx);
820                 return;
821         }
822
823         for (i=0;i<dbmap->num;i++) {
824                 if (dbmap->dbs[i].dbid == recs->db_id) {
825                         persistent = dbmap->dbs[i].persistent;
826                         break;
827                 }
828         }
829         if (i == dbmap->num) {
830                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
831                 talloc_free(tmp_ctx);
832                 return;         
833         }
834
835         /* find the name of this database */
836         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
837                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
838                 talloc_free(tmp_ctx);
839                 return;
840         }
841
842         /* attach to it */
843         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
844         if (ctdb_db == NULL) {
845                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
846                 talloc_free(tmp_ctx);
847                 return;
848         }
849
850         v = talloc_zero(rec, struct vacuum_info);
851         if (v == NULL) {
852                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
853                 talloc_free(tmp_ctx);
854                 return;
855         }
856
857         v->rec = rec;
858         v->srcnode = srcnode;
859         v->ctdb_db = ctdb_db;
860         v->recs = talloc_memdup(v, recs, data.dsize);
861         if (v->recs == NULL) {
862                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
863                 talloc_free(v);
864                 talloc_free(tmp_ctx);
865                 return;         
866         }
867         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
868
869         DLIST_ADD(rec->vacuum_info, v);
870
871         talloc_set_destructor(v, vacuum_info_destructor);
872
873         vacuum_fetch_next(v);
874         talloc_free(tmp_ctx);
875 }
876
877
878 /*
879   called when ctdb_wait_timeout should finish
880  */
881 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
882                               struct timeval yt, void *p)
883 {
884         uint32_t *timed_out = (uint32_t *)p;
885         (*timed_out) = 1;
886 }
887
888 /*
889   wait for a given number of seconds
890  */
891 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
892 {
893         uint32_t timed_out = 0;
894         time_t usecs = (secs - (time_t)secs) * 1000000;
895         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
896         while (!timed_out) {
897                 event_loop_once(ctdb->ev);
898         }
899 }
900
901 /*
902   called when an election times out (ends)
903  */
904 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
905                                   struct timeval t, void *p)
906 {
907         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
908         rec->election_timeout = NULL;
909         fast_start = false;
910
911         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
912 }
913
914
915 /*
916   wait for an election to finish. It finished election_timeout seconds after
917   the last election packet is received
918  */
919 static void ctdb_wait_election(struct ctdb_recoverd *rec)
920 {
921         struct ctdb_context *ctdb = rec->ctdb;
922         while (rec->election_timeout) {
923                 event_loop_once(ctdb->ev);
924         }
925 }
926
927 /*
928   Update our local flags from all remote connected nodes. 
929   This is only run when we are or we belive we are the recovery master
930  */
931 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
932 {
933         int j;
934         struct ctdb_context *ctdb = rec->ctdb;
935         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
936
937         /* get the nodemap for all active remote nodes and verify
938            they are the same as for this node
939          */
940         for (j=0; j<nodemap->num; j++) {
941                 struct ctdb_node_map *remote_nodemap=NULL;
942                 int ret;
943
944                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
945                         continue;
946                 }
947                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
948                         continue;
949                 }
950
951                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
952                                            mem_ctx, &remote_nodemap);
953                 if (ret != 0) {
954                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
955                                   nodemap->nodes[j].pnn));
956                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
957                         talloc_free(mem_ctx);
958                         return MONITOR_FAILED;
959                 }
960                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
961                         /* We should tell our daemon about this so it
962                            updates its flags or else we will log the same 
963                            message again in the next iteration of recovery.
964                            Since we are the recovery master we can just as
965                            well update the flags on all nodes.
966                         */
967                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
968                         if (ret != 0) {
969                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
970                                 return -1;
971                         }
972
973                         /* Update our local copy of the flags in the recovery
974                            daemon.
975                         */
976                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
977                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
978                                  nodemap->nodes[j].flags));
979                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
980                 }
981                 talloc_free(remote_nodemap);
982         }
983         talloc_free(mem_ctx);
984         return MONITOR_OK;
985 }
986
987
988 /* Create a new random generation ip. 
989    The generation id can not be the INVALID_GENERATION id
990 */
991 static uint32_t new_generation(void)
992 {
993         uint32_t generation;
994
995         while (1) {
996                 generation = random();
997
998                 if (generation != INVALID_GENERATION) {
999                         break;
1000                 }
1001         }
1002
1003         return generation;
1004 }
1005
1006
1007 /*
1008   create a temporary working database
1009  */
1010 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1011 {
1012         char *name;
1013         struct tdb_wrap *recdb;
1014         unsigned tdb_flags;
1015
1016         /* open up the temporary recovery database */
1017         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1018                                ctdb->db_directory_state,
1019                                ctdb->pnn);
1020         if (name == NULL) {
1021                 return NULL;
1022         }
1023         unlink(name);
1024
1025         tdb_flags = TDB_NOLOCK;
1026         if (ctdb->valgrinding) {
1027                 tdb_flags |= TDB_NOMMAP;
1028         }
1029         tdb_flags |= TDB_DISALLOW_NESTING;
1030
1031         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1032                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1033         if (recdb == NULL) {
1034                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1035         }
1036
1037         talloc_free(name);
1038
1039         return recdb;
1040 }
1041
1042
1043 /* 
1044    a traverse function for pulling all relevent records from recdb
1045  */
1046 struct recdb_data {
1047         struct ctdb_context *ctdb;
1048         struct ctdb_marshall_buffer *recdata;
1049         uint32_t len;
1050         bool failed;
1051         bool persistent;
1052 };
1053
1054 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1055 {
1056         struct recdb_data *params = (struct recdb_data *)p;
1057         struct ctdb_rec_data *rec;
1058         struct ctdb_ltdb_header *hdr;
1059
1060         /* skip empty records */
1061         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1062                 return 0;
1063         }
1064
1065         /* update the dmaster field to point to us */
1066         hdr = (struct ctdb_ltdb_header *)data.dptr;
1067         if (!params->persistent) {
1068                 hdr->dmaster = params->ctdb->pnn;
1069         }
1070
1071         /* add the record to the blob ready to send to the nodes */
1072         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1073         if (rec == NULL) {
1074                 params->failed = true;
1075                 return -1;
1076         }
1077         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1078         if (params->recdata == NULL) {
1079                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1080                          rec->length + params->len, params->recdata->count));
1081                 params->failed = true;
1082                 return -1;
1083         }
1084         params->recdata->count++;
1085         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1086         params->len += rec->length;
1087         talloc_free(rec);
1088
1089         return 0;
1090 }
1091
1092 /*
1093   push the recdb database out to all nodes
1094  */
1095 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1096                                bool persistent,
1097                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1098 {
1099         struct recdb_data params;
1100         struct ctdb_marshall_buffer *recdata;
1101         TDB_DATA outdata;
1102         TALLOC_CTX *tmp_ctx;
1103         uint32_t *nodes;
1104
1105         tmp_ctx = talloc_new(ctdb);
1106         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1107
1108         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1109         CTDB_NO_MEMORY(ctdb, recdata);
1110
1111         recdata->db_id = dbid;
1112
1113         params.ctdb = ctdb;
1114         params.recdata = recdata;
1115         params.len = offsetof(struct ctdb_marshall_buffer, data);
1116         params.failed = false;
1117         params.persistent = persistent;
1118
1119         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1120                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1121                 talloc_free(params.recdata);
1122                 talloc_free(tmp_ctx);
1123                 return -1;
1124         }
1125
1126         if (params.failed) {
1127                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1128                 talloc_free(params.recdata);
1129                 talloc_free(tmp_ctx);
1130                 return -1;              
1131         }
1132
1133         recdata = params.recdata;
1134
1135         outdata.dptr = (void *)recdata;
1136         outdata.dsize = params.len;
1137
1138         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1139         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1140                                         nodes, 0,
1141                                         CONTROL_TIMEOUT(), false, outdata,
1142                                         NULL, NULL,
1143                                         NULL) != 0) {
1144                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1145                 talloc_free(recdata);
1146                 talloc_free(tmp_ctx);
1147                 return -1;
1148         }
1149
1150         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1151                   dbid, recdata->count));
1152
1153         talloc_free(recdata);
1154         talloc_free(tmp_ctx);
1155
1156         return 0;
1157 }
1158
1159
1160 /*
1161   go through a full recovery on one database 
1162  */
1163 static int recover_database(struct ctdb_recoverd *rec, 
1164                             TALLOC_CTX *mem_ctx,
1165                             uint32_t dbid,
1166                             bool persistent,
1167                             uint32_t pnn, 
1168                             struct ctdb_node_map *nodemap,
1169                             uint32_t transaction_id)
1170 {
1171         struct tdb_wrap *recdb;
1172         int ret;
1173         struct ctdb_context *ctdb = rec->ctdb;
1174         TDB_DATA data;
1175         struct ctdb_control_wipe_database w;
1176         uint32_t *nodes;
1177
1178         recdb = create_recdb(ctdb, mem_ctx);
1179         if (recdb == NULL) {
1180                 return -1;
1181         }
1182
1183         /* pull all remote databases onto the recdb */
1184         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1185         if (ret != 0) {
1186                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1187                 return -1;
1188         }
1189
1190         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1191
1192         /* wipe all the remote databases. This is safe as we are in a transaction */
1193         w.db_id = dbid;
1194         w.transaction_id = transaction_id;
1195
1196         data.dptr = (void *)&w;
1197         data.dsize = sizeof(w);
1198
1199         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1200         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1201                                         nodes, 0,
1202                                         CONTROL_TIMEOUT(), false, data,
1203                                         NULL, NULL,
1204                                         NULL) != 0) {
1205                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1206                 talloc_free(recdb);
1207                 return -1;
1208         }
1209         
1210         /* push out the correct database. This sets the dmaster and skips 
1211            the empty records */
1212         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1213         if (ret != 0) {
1214                 talloc_free(recdb);
1215                 return -1;
1216         }
1217
1218         /* all done with this database */
1219         talloc_free(recdb);
1220
1221         return 0;
1222 }
1223
1224 /*
1225   reload the nodes file 
1226 */
1227 static void reload_nodes_file(struct ctdb_context *ctdb)
1228 {
1229         ctdb->nodes = NULL;
1230         ctdb_load_nodes_file(ctdb);
1231 }
1232
1233 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1234                                          struct ctdb_recoverd *rec,
1235                                          struct ctdb_node_map *nodemap,
1236                                          uint32_t *culprit)
1237 {
1238         int j;
1239         int ret;
1240
1241         if (ctdb->num_nodes != nodemap->num) {
1242                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1243                                   ctdb->num_nodes, nodemap->num));
1244                 if (culprit) {
1245                         *culprit = ctdb->pnn;
1246                 }
1247                 return -1;
1248         }
1249
1250         for (j=0; j<nodemap->num; j++) {
1251                 /* release any existing data */
1252                 if (ctdb->nodes[j]->known_public_ips) {
1253                         talloc_free(ctdb->nodes[j]->known_public_ips);
1254                         ctdb->nodes[j]->known_public_ips = NULL;
1255                 }
1256                 if (ctdb->nodes[j]->available_public_ips) {
1257                         talloc_free(ctdb->nodes[j]->available_public_ips);
1258                         ctdb->nodes[j]->available_public_ips = NULL;
1259                 }
1260
1261                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1262                         continue;
1263                 }
1264
1265                 /* grab a new shiny list of public ips from the node */
1266                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1267                                         CONTROL_TIMEOUT(),
1268                                         ctdb->nodes[j]->pnn,
1269                                         ctdb->nodes,
1270                                         0,
1271                                         &ctdb->nodes[j]->known_public_ips);
1272                 if (ret != 0) {
1273                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1274                                 ctdb->nodes[j]->pnn));
1275                         if (culprit) {
1276                                 *culprit = ctdb->nodes[j]->pnn;
1277                         }
1278                         return -1;
1279                 }
1280
1281                 if (ctdb->tunable.disable_ip_failover == 0) {
1282                         if (rec->ip_check_disable_ctx == NULL) {
1283                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1284                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1285                                         rec->need_takeover_run = true;
1286                                 }
1287                         }
1288                 }
1289
1290                 /* grab a new shiny list of public ips from the node */
1291                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1292                                         CONTROL_TIMEOUT(),
1293                                         ctdb->nodes[j]->pnn,
1294                                         ctdb->nodes,
1295                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1296                                         &ctdb->nodes[j]->available_public_ips);
1297                 if (ret != 0) {
1298                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1299                                 ctdb->nodes[j]->pnn));
1300                         if (culprit) {
1301                                 *culprit = ctdb->nodes[j]->pnn;
1302                         }
1303                         return -1;
1304                 }
1305         }
1306
1307         return 0;
1308 }
1309
1310 /* when we start a recovery, make sure all nodes use the same reclock file
1311    setting
1312 */
1313 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1314 {
1315         struct ctdb_context *ctdb = rec->ctdb;
1316         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1317         TDB_DATA data;
1318         uint32_t *nodes;
1319
1320         if (ctdb->recovery_lock_file == NULL) {
1321                 data.dptr  = NULL;
1322                 data.dsize = 0;
1323         } else {
1324                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1325                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1326         }
1327
1328         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1329         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1330                                         nodes, 0,
1331                                         CONTROL_TIMEOUT(),
1332                                         false, data,
1333                                         NULL, NULL,
1334                                         rec) != 0) {
1335                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1336                 talloc_free(tmp_ctx);
1337                 return -1;
1338         }
1339
1340         talloc_free(tmp_ctx);
1341         return 0;
1342 }
1343
1344
1345 /*
1346   we are the recmaster, and recovery is needed - start a recovery run
1347  */
1348 static int do_recovery(struct ctdb_recoverd *rec, 
1349                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1350                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1351 {
1352         struct ctdb_context *ctdb = rec->ctdb;
1353         int i, j, ret;
1354         uint32_t generation;
1355         struct ctdb_dbid_map *dbmap;
1356         TDB_DATA data;
1357         uint32_t *nodes;
1358         struct timeval start_time;
1359         uint32_t culprit = (uint32_t)-1;
1360
1361         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1362
1363         /* if recovery fails, force it again */
1364         rec->need_recovery = true;
1365
1366         for (i=0; i<ctdb->num_nodes; i++) {
1367                 struct ctdb_banning_state *ban_state;
1368
1369                 if (ctdb->nodes[i]->ban_state == NULL) {
1370                         continue;
1371                 }
1372                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1373                 if (ban_state->count < 2*ctdb->num_nodes) {
1374                         continue;
1375                 }
1376                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1377                         ctdb->nodes[i]->pnn, ban_state->count,
1378                         ctdb->tunable.recovery_ban_period));
1379                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1380                 ban_state->count = 0;
1381         }
1382
1383
1384         if (ctdb->tunable.verify_recovery_lock != 0) {
1385                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1386                 start_time = timeval_current();
1387                 if (!ctdb_recovery_lock(ctdb, true)) {
1388                         ctdb_set_culprit(rec, pnn);
1389                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1390                         return -1;
1391                 }
1392                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1393                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1394         }
1395
1396         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1397
1398         /* get a list of all databases */
1399         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1400         if (ret != 0) {
1401                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1402                 return -1;
1403         }
1404
1405         /* we do the db creation before we set the recovery mode, so the freeze happens
1406            on all databases we will be dealing with. */
1407
1408         /* verify that we have all the databases any other node has */
1409         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1410         if (ret != 0) {
1411                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1412                 return -1;
1413         }
1414
1415         /* verify that all other nodes have all our databases */
1416         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1417         if (ret != 0) {
1418                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1419                 return -1;
1420         }
1421         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1422
1423         /* update the database priority for all remote databases */
1424         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1425         if (ret != 0) {
1426                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1427         }
1428         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1429
1430
1431         /* update all other nodes to use the same setting for reclock files
1432            as the local recovery master.
1433         */
1434         sync_recovery_lock_file_across_cluster(rec);
1435
1436         /* set recovery mode to active on all nodes */
1437         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1438         if (ret != 0) {
1439                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1440                 return -1;
1441         }
1442
1443         /* execute the "startrecovery" event script on all nodes */
1444         ret = run_startrecovery_eventscript(rec, nodemap);
1445         if (ret!=0) {
1446                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1447                 return -1;
1448         }
1449
1450         /*
1451           update all nodes to have the same flags that we have
1452          */
1453         for (i=0;i<nodemap->num;i++) {
1454                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1455                         continue;
1456                 }
1457
1458                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1459                 if (ret != 0) {
1460                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1461                         return -1;
1462                 }
1463         }
1464
1465         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1466
1467         /* pick a new generation number */
1468         generation = new_generation();
1469
1470         /* change the vnnmap on this node to use the new generation 
1471            number but not on any other nodes.
1472            this guarantees that if we abort the recovery prematurely
1473            for some reason (a node stops responding?)
1474            that we can just return immediately and we will reenter
1475            recovery shortly again.
1476            I.e. we deliberately leave the cluster with an inconsistent
1477            generation id to allow us to abort recovery at any stage and
1478            just restart it from scratch.
1479          */
1480         vnnmap->generation = generation;
1481         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1482         if (ret != 0) {
1483                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1484                 return -1;
1485         }
1486
1487         data.dptr = (void *)&generation;
1488         data.dsize = sizeof(uint32_t);
1489
1490         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1491         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1492                                         nodes, 0,
1493                                         CONTROL_TIMEOUT(), false, data,
1494                                         NULL,
1495                                         transaction_start_fail_callback,
1496                                         rec) != 0) {
1497                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1498                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1499                                         nodes, 0,
1500                                         CONTROL_TIMEOUT(), false, tdb_null,
1501                                         NULL,
1502                                         NULL,
1503                                         NULL) != 0) {
1504                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1505                 }
1506                 return -1;
1507         }
1508
1509         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1510
1511         for (i=0;i<dbmap->num;i++) {
1512                 ret = recover_database(rec, mem_ctx,
1513                                        dbmap->dbs[i].dbid,
1514                                        dbmap->dbs[i].persistent,
1515                                        pnn, nodemap, generation);
1516                 if (ret != 0) {
1517                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1518                         return -1;
1519                 }
1520         }
1521
1522         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1523
1524         /* commit all the changes */
1525         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1526                                         nodes, 0,
1527                                         CONTROL_TIMEOUT(), false, data,
1528                                         NULL, NULL,
1529                                         NULL) != 0) {
1530                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1531                 return -1;
1532         }
1533
1534         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1535         
1536
1537         /* update the capabilities for all nodes */
1538         ret = update_capabilities(ctdb, nodemap);
1539         if (ret!=0) {
1540                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1541                 return -1;
1542         }
1543
1544         /* build a new vnn map with all the currently active and
1545            unbanned nodes */
1546         generation = new_generation();
1547         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1548         CTDB_NO_MEMORY(ctdb, vnnmap);
1549         vnnmap->generation = generation;
1550         vnnmap->size = 0;
1551         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1552         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1553         for (i=j=0;i<nodemap->num;i++) {
1554                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1555                         continue;
1556                 }
1557                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1558                         /* this node can not be an lmaster */
1559                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1560                         continue;
1561                 }
1562
1563                 vnnmap->size++;
1564                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1565                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1566                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1567
1568         }
1569         if (vnnmap->size == 0) {
1570                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1571                 vnnmap->size++;
1572                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1573                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1574                 vnnmap->map[0] = pnn;
1575         }       
1576
1577         /* update to the new vnnmap on all nodes */
1578         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1579         if (ret != 0) {
1580                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1581                 return -1;
1582         }
1583
1584         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1585
1586         /* update recmaster to point to us for all nodes */
1587         ret = set_recovery_master(ctdb, nodemap, pnn);
1588         if (ret!=0) {
1589                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1590                 return -1;
1591         }
1592
1593         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1594
1595         /*
1596           update all nodes to have the same flags that we have
1597          */
1598         for (i=0;i<nodemap->num;i++) {
1599                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1600                         continue;
1601                 }
1602
1603                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1604                 if (ret != 0) {
1605                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1606                         return -1;
1607                 }
1608         }
1609
1610         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1611
1612         /* disable recovery mode */
1613         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1614         if (ret != 0) {
1615                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1616                 return -1;
1617         }
1618
1619         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1620
1621         /*
1622           tell nodes to takeover their public IPs
1623          */
1624         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1625         if (ret != 0) {
1626                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1627                                  culprit));
1628                 rec->need_takeover_run = true;
1629                 return -1;
1630         }
1631         rec->need_takeover_run = false;
1632         ret = ctdb_takeover_run(ctdb, nodemap);
1633         if (ret != 0) {
1634                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1635                 rec->need_takeover_run = true;
1636         }
1637         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1638
1639         /* execute the "recovered" event script on all nodes */
1640         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1641         if (ret!=0) {
1642                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1643                 return -1;
1644         }
1645
1646         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1647
1648         /* send a message to all clients telling them that the cluster 
1649            has been reconfigured */
1650         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1651
1652         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1653
1654         rec->need_recovery = false;
1655
1656         /* we managed to complete a full recovery, make sure to forgive
1657            any past sins by the nodes that could now participate in the
1658            recovery.
1659         */
1660         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1661         for (i=0;i<nodemap->num;i++) {
1662                 struct ctdb_banning_state *ban_state;
1663
1664                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1665                         continue;
1666                 }
1667
1668                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1669                 if (ban_state == NULL) {
1670                         continue;
1671                 }
1672
1673                 ban_state->count = 0;
1674         }
1675
1676
1677         /* We just finished a recovery successfully. 
1678            We now wait for rerecovery_timeout before we allow 
1679            another recovery to take place.
1680         */
1681         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1682         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1683         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1684
1685         return 0;
1686 }
1687
1688
1689 /*
1690   elections are won by first checking the number of connected nodes, then
1691   the priority time, then the pnn
1692  */
1693 struct election_message {
1694         uint32_t num_connected;
1695         struct timeval priority_time;
1696         uint32_t pnn;
1697         uint32_t node_flags;
1698 };
1699
1700 /*
1701   form this nodes election data
1702  */
1703 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1704 {
1705         int ret, i;
1706         struct ctdb_node_map *nodemap;
1707         struct ctdb_context *ctdb = rec->ctdb;
1708
1709         ZERO_STRUCTP(em);
1710
1711         em->pnn = rec->ctdb->pnn;
1712         em->priority_time = rec->priority_time;
1713
1714         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1715         if (ret != 0) {
1716                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1717                 return;
1718         }
1719
1720         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1721         em->node_flags = rec->node_flags;
1722
1723         for (i=0;i<nodemap->num;i++) {
1724                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1725                         em->num_connected++;
1726                 }
1727         }
1728
1729         /* we shouldnt try to win this election if we cant be a recmaster */
1730         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1731                 em->num_connected = 0;
1732                 em->priority_time = timeval_current();
1733         }
1734
1735         talloc_free(nodemap);
1736 }
1737
1738 /*
1739   see if the given election data wins
1740  */
1741 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1742 {
1743         struct election_message myem;
1744         int cmp = 0;
1745
1746         ctdb_election_data(rec, &myem);
1747
1748         /* we cant win if we dont have the recmaster capability */
1749         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1750                 return false;
1751         }
1752
1753         /* we cant win if we are banned */
1754         if (rec->node_flags & NODE_FLAGS_BANNED) {
1755                 return false;
1756         }       
1757
1758         /* we cant win if we are stopped */
1759         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1760                 return false;
1761         }       
1762
1763         /* we will automatically win if the other node is banned */
1764         if (em->node_flags & NODE_FLAGS_BANNED) {
1765                 return true;
1766         }
1767
1768         /* we will automatically win if the other node is banned */
1769         if (em->node_flags & NODE_FLAGS_STOPPED) {
1770                 return true;
1771         }
1772
1773         /* try to use the most connected node */
1774         if (cmp == 0) {
1775                 cmp = (int)myem.num_connected - (int)em->num_connected;
1776         }
1777
1778         /* then the longest running node */
1779         if (cmp == 0) {
1780                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1781         }
1782
1783         if (cmp == 0) {
1784                 cmp = (int)myem.pnn - (int)em->pnn;
1785         }
1786
1787         return cmp > 0;
1788 }
1789
1790 /*
1791   send out an election request
1792  */
1793 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1794 {
1795         int ret;
1796         TDB_DATA election_data;
1797         struct election_message emsg;
1798         uint64_t srvid;
1799         struct ctdb_context *ctdb = rec->ctdb;
1800
1801         srvid = CTDB_SRVID_RECOVERY;
1802
1803         ctdb_election_data(rec, &emsg);
1804
1805         election_data.dsize = sizeof(struct election_message);
1806         election_data.dptr  = (unsigned char *)&emsg;
1807
1808
1809         /* send an election message to all active nodes */
1810         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1811         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1812
1813
1814         /* A new node that is already frozen has entered the cluster.
1815            The existing nodes are not frozen and dont need to be frozen
1816            until the election has ended and we start the actual recovery
1817         */
1818         if (update_recmaster == true) {
1819                 /* first we assume we will win the election and set 
1820                    recoverymaster to be ourself on the current node
1821                  */
1822                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1823                 if (ret != 0) {
1824                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1825                         return -1;
1826                 }
1827         }
1828
1829
1830         return 0;
1831 }
1832
1833 /*
1834   this function will unban all nodes in the cluster
1835 */
1836 static void unban_all_nodes(struct ctdb_context *ctdb)
1837 {
1838         int ret, i;
1839         struct ctdb_node_map *nodemap;
1840         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1841         
1842         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1845                 return;
1846         }
1847
1848         for (i=0;i<nodemap->num;i++) {
1849                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1850                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1851                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1852                 }
1853         }
1854
1855         talloc_free(tmp_ctx);
1856 }
1857
1858
1859 /*
1860   we think we are winning the election - send a broadcast election request
1861  */
1862 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1863 {
1864         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1865         int ret;
1866
1867         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1870         }
1871
1872         talloc_free(rec->send_election_te);
1873         rec->send_election_te = NULL;
1874 }
1875
1876 /*
1877   handler for memory dumps
1878 */
1879 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1880                              TDB_DATA data, void *private_data)
1881 {
1882         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1883         TDB_DATA *dump;
1884         int ret;
1885         struct rd_memdump_reply *rd;
1886
1887         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1888                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1889                 talloc_free(tmp_ctx);
1890                 return;
1891         }
1892         rd = (struct rd_memdump_reply *)data.dptr;
1893
1894         dump = talloc_zero(tmp_ctx, TDB_DATA);
1895         if (dump == NULL) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1897                 talloc_free(tmp_ctx);
1898                 return;
1899         }
1900         ret = ctdb_dump_memory(ctdb, dump);
1901         if (ret != 0) {
1902                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1903                 talloc_free(tmp_ctx);
1904                 return;
1905         }
1906
1907 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1908
1909         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1910         if (ret != 0) {
1911                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1912                 talloc_free(tmp_ctx);
1913                 return;
1914         }
1915
1916         talloc_free(tmp_ctx);
1917 }
1918
1919 /*
1920   handler for reload_nodes
1921 */
1922 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1923                              TDB_DATA data, void *private_data)
1924 {
1925         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1926
1927         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1928
1929         reload_nodes_file(rec->ctdb);
1930 }
1931
1932
1933 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1934                               struct timeval yt, void *p)
1935 {
1936         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1937
1938         talloc_free(rec->ip_check_disable_ctx);
1939         rec->ip_check_disable_ctx = NULL;
1940 }
1941
1942
1943 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1944                              TDB_DATA data, void *private_data)
1945 {
1946         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1947         struct ctdb_public_ip *ip;
1948
1949         if (rec->recmaster != rec->ctdb->pnn) {
1950                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1951                 return;
1952         }
1953
1954         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1955                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1956                 return;
1957         }
1958
1959         ip = (struct ctdb_public_ip *)data.dptr;
1960
1961         update_ip_assignment_tree(rec->ctdb, ip);
1962 }
1963
1964
1965 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1966                              TDB_DATA data, void *private_data)
1967 {
1968         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1969         uint32_t timeout;
1970
1971         if (rec->ip_check_disable_ctx != NULL) {
1972                 talloc_free(rec->ip_check_disable_ctx);
1973                 rec->ip_check_disable_ctx = NULL;
1974         }
1975
1976         if (data.dsize != sizeof(uint32_t)) {
1977                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1978                                  "expexting %lu\n", (long unsigned)data.dsize,
1979                                  (long unsigned)sizeof(uint32_t)));
1980                 return;
1981         }
1982         if (data.dptr == NULL) {
1983                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1984                 return;
1985         }
1986
1987         timeout = *((uint32_t *)data.dptr);
1988         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1989
1990         rec->ip_check_disable_ctx = talloc_new(rec);
1991         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1992
1993         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1994 }
1995
1996
1997 /*
1998   handler for ip reallocate, just add it to the list of callers and 
1999   handle this later in the monitor_cluster loop so we do not recurse
2000   with other callers to takeover_run()
2001 */
2002 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2003                              TDB_DATA data, void *private_data)
2004 {
2005         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2006         struct ip_reallocate_list *caller;
2007
2008         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2009                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2010                 return;
2011         }
2012
2013         if (rec->ip_reallocate_ctx == NULL) {
2014                 rec->ip_reallocate_ctx = talloc_new(rec);
2015                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2016         }
2017
2018         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2019         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2020
2021         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2022         caller->next = rec->reallocate_callers;
2023         rec->reallocate_callers = caller;
2024
2025         return;
2026 }
2027
2028 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2029 {
2030         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2031         TDB_DATA result;
2032         int32_t ret;
2033         struct ip_reallocate_list *callers;
2034         uint32_t culprit;
2035
2036         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2037
2038         /* update the list of public ips that a node can handle for
2039            all connected nodes
2040         */
2041         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2042         if (ret != 0) {
2043                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2044                                  culprit));
2045                 rec->need_takeover_run = true;
2046         }
2047         if (ret == 0) {
2048                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2049                 if (ret != 0) {
2050                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2051                         rec->need_takeover_run = true;
2052                 }
2053         }
2054
2055         result.dsize = sizeof(int32_t);
2056         result.dptr  = (uint8_t *)&ret;
2057
2058         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2059
2060                 /* Someone that sent srvid==0 does not want a reply */
2061                 if (callers->rd->srvid == 0) {
2062                         continue;
2063                 }
2064                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2065                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2066                                   (unsigned long long)callers->rd->srvid));
2067                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2068                 if (ret != 0) {
2069                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2070                                          "message to %u:%llu\n",
2071                                          (unsigned)callers->rd->pnn,
2072                                          (unsigned long long)callers->rd->srvid));
2073                 }
2074         }
2075
2076         talloc_free(tmp_ctx);
2077         talloc_free(rec->ip_reallocate_ctx);
2078         rec->ip_reallocate_ctx = NULL;
2079         rec->reallocate_callers = NULL;
2080         
2081 }
2082
2083
2084 /*
2085   handler for recovery master elections
2086 */
2087 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2088                              TDB_DATA data, void *private_data)
2089 {
2090         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2091         int ret;
2092         struct election_message *em = (struct election_message *)data.dptr;
2093         TALLOC_CTX *mem_ctx;
2094
2095         /* we got an election packet - update the timeout for the election */
2096         talloc_free(rec->election_timeout);
2097         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2098                                                 fast_start ?
2099                                                 timeval_current_ofs(0, 500000) :
2100                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2101                                                 ctdb_election_timeout, rec);
2102
2103         mem_ctx = talloc_new(ctdb);
2104
2105         /* someone called an election. check their election data
2106            and if we disagree and we would rather be the elected node, 
2107            send a new election message to all other nodes
2108          */
2109         if (ctdb_election_win(rec, em)) {
2110                 if (!rec->send_election_te) {
2111                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2112                                                                 timeval_current_ofs(0, 500000),
2113                                                                 election_send_request, rec);
2114                 }
2115                 talloc_free(mem_ctx);
2116                 /*unban_all_nodes(ctdb);*/
2117                 return;
2118         }
2119         
2120         /* we didn't win */
2121         talloc_free(rec->send_election_te);
2122         rec->send_election_te = NULL;
2123
2124         if (ctdb->tunable.verify_recovery_lock != 0) {
2125                 /* release the recmaster lock */
2126                 if (em->pnn != ctdb->pnn &&
2127                     ctdb->recovery_lock_fd != -1) {
2128                         close(ctdb->recovery_lock_fd);
2129                         ctdb->recovery_lock_fd = -1;
2130                         unban_all_nodes(ctdb);
2131                 }
2132         }
2133
2134         /* ok, let that guy become recmaster then */
2135         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2136         if (ret != 0) {
2137                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2138                 talloc_free(mem_ctx);
2139                 return;
2140         }
2141
2142         talloc_free(mem_ctx);
2143         return;
2144 }
2145
2146
2147 /*
2148   force the start of the election process
2149  */
2150 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2151                            struct ctdb_node_map *nodemap)
2152 {
2153         int ret;
2154         struct ctdb_context *ctdb = rec->ctdb;
2155
2156         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2157
2158         /* set all nodes to recovery mode to stop all internode traffic */
2159         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2160         if (ret != 0) {
2161                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2162                 return;
2163         }
2164
2165         talloc_free(rec->election_timeout);
2166         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2167                                                 fast_start ?
2168                                                 timeval_current_ofs(0, 500000) :
2169                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2170                                                 ctdb_election_timeout, rec);
2171
2172         ret = send_election_request(rec, pnn, true);
2173         if (ret!=0) {
2174                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2175                 return;
2176         }
2177
2178         /* wait for a few seconds to collect all responses */
2179         ctdb_wait_election(rec);
2180 }
2181
2182
2183
2184 /*
2185   handler for when a node changes its flags
2186 */
2187 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2188                             TDB_DATA data, void *private_data)
2189 {
2190         int ret;
2191         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2192         struct ctdb_node_map *nodemap=NULL;
2193         TALLOC_CTX *tmp_ctx;
2194         uint32_t changed_flags;
2195         int i;
2196         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2197         int disabled_flag_changed;
2198
2199         if (data.dsize != sizeof(*c)) {
2200                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2201                 return;
2202         }
2203
2204         tmp_ctx = talloc_new(ctdb);
2205         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2206
2207         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2208         if (ret != 0) {
2209                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2210                 talloc_free(tmp_ctx);
2211                 return;         
2212         }
2213
2214
2215         for (i=0;i<nodemap->num;i++) {
2216                 if (nodemap->nodes[i].pnn == c->pnn) break;
2217         }
2218
2219         if (i == nodemap->num) {
2220                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2221                 talloc_free(tmp_ctx);
2222                 return;
2223         }
2224
2225         changed_flags = c->old_flags ^ c->new_flags;
2226
2227         if (nodemap->nodes[i].flags != c->new_flags) {
2228                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2229         }
2230
2231         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2232
2233         nodemap->nodes[i].flags = c->new_flags;
2234
2235         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2236                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2237
2238         if (ret == 0) {
2239                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2240                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2241         }
2242         
2243         if (ret == 0 &&
2244             ctdb->recovery_master == ctdb->pnn &&
2245             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2246                 /* Only do the takeover run if the perm disabled or unhealthy
2247                    flags changed since these will cause an ip failover but not
2248                    a recovery.
2249                    If the node became disconnected or banned this will also
2250                    lead to an ip address failover but that is handled 
2251                    during recovery
2252                 */
2253                 if (disabled_flag_changed) {
2254                         rec->need_takeover_run = true;
2255                 }
2256         }
2257
2258         talloc_free(tmp_ctx);
2259 }
2260
2261 /*
2262   handler for when we need to push out flag changes ot all other nodes
2263 */
2264 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2265                             TDB_DATA data, void *private_data)
2266 {
2267         int ret;
2268         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2269         struct ctdb_node_map *nodemap=NULL;
2270         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2271         uint32_t recmaster;
2272         uint32_t *nodes;
2273
2274         /* find the recovery master */
2275         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2276         if (ret != 0) {
2277                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2278                 talloc_free(tmp_ctx);
2279                 return;
2280         }
2281
2282         /* read the node flags from the recmaster */
2283         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2284         if (ret != 0) {
2285                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2286                 talloc_free(tmp_ctx);
2287                 return;
2288         }
2289         if (c->pnn >= nodemap->num) {
2290                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2291                 talloc_free(tmp_ctx);
2292                 return;
2293         }
2294
2295         /* send the flags update to all connected nodes */
2296         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2297
2298         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2299                                       nodes, 0, CONTROL_TIMEOUT(),
2300                                       false, data,
2301                                       NULL, NULL,
2302                                       NULL) != 0) {
2303                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2304
2305                 talloc_free(tmp_ctx);
2306                 return;
2307         }
2308
2309         talloc_free(tmp_ctx);
2310 }
2311
2312
2313 struct verify_recmode_normal_data {
2314         uint32_t count;
2315         enum monitor_result status;
2316 };
2317
2318 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2319 {
2320         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2321
2322
2323         /* one more node has responded with recmode data*/
2324         rmdata->count--;
2325
2326         /* if we failed to get the recmode, then return an error and let
2327            the main loop try again.
2328         */
2329         if (state->state != CTDB_CONTROL_DONE) {
2330                 if (rmdata->status == MONITOR_OK) {
2331                         rmdata->status = MONITOR_FAILED;
2332                 }
2333                 return;
2334         }
2335
2336         /* if we got a response, then the recmode will be stored in the
2337            status field
2338         */
2339         if (state->status != CTDB_RECOVERY_NORMAL) {
2340                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2341                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2342         }
2343
2344         return;
2345 }
2346
2347
2348 /* verify that all nodes are in normal recovery mode */
2349 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2350 {
2351         struct verify_recmode_normal_data *rmdata;
2352         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2353         struct ctdb_client_control_state *state;
2354         enum monitor_result status;
2355         int j;
2356         
2357         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2358         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2359         rmdata->count  = 0;
2360         rmdata->status = MONITOR_OK;
2361
2362         /* loop over all active nodes and send an async getrecmode call to 
2363            them*/
2364         for (j=0; j<nodemap->num; j++) {
2365                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2366                         continue;
2367                 }
2368                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2369                                         CONTROL_TIMEOUT(), 
2370                                         nodemap->nodes[j].pnn);
2371                 if (state == NULL) {
2372                         /* we failed to send the control, treat this as 
2373                            an error and try again next iteration
2374                         */                      
2375                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2376                         talloc_free(mem_ctx);
2377                         return MONITOR_FAILED;
2378                 }
2379
2380                 /* set up the callback functions */
2381                 state->async.fn = verify_recmode_normal_callback;
2382                 state->async.private_data = rmdata;
2383
2384                 /* one more control to wait for to complete */
2385                 rmdata->count++;
2386         }
2387
2388
2389         /* now wait for up to the maximum number of seconds allowed
2390            or until all nodes we expect a response from has replied
2391         */
2392         while (rmdata->count > 0) {
2393                 event_loop_once(ctdb->ev);
2394         }
2395
2396         status = rmdata->status;
2397         talloc_free(mem_ctx);
2398         return status;
2399 }
2400
2401
2402 struct verify_recmaster_data {
2403         struct ctdb_recoverd *rec;
2404         uint32_t count;
2405         uint32_t pnn;
2406         enum monitor_result status;
2407 };
2408
2409 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2410 {
2411         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2412
2413
2414         /* one more node has responded with recmaster data*/
2415         rmdata->count--;
2416
2417         /* if we failed to get the recmaster, then return an error and let
2418            the main loop try again.
2419         */
2420         if (state->state != CTDB_CONTROL_DONE) {
2421                 if (rmdata->status == MONITOR_OK) {
2422                         rmdata->status = MONITOR_FAILED;
2423                 }
2424                 return;
2425         }
2426
2427         /* if we got a response, then the recmaster will be stored in the
2428            status field
2429         */
2430         if (state->status != rmdata->pnn) {
2431                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2432                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2433                 rmdata->status = MONITOR_ELECTION_NEEDED;
2434         }
2435
2436         return;
2437 }
2438
2439
2440 /* verify that all nodes agree that we are the recmaster */
2441 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2442 {
2443         struct ctdb_context *ctdb = rec->ctdb;
2444         struct verify_recmaster_data *rmdata;
2445         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2446         struct ctdb_client_control_state *state;
2447         enum monitor_result status;
2448         int j;
2449         
2450         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2451         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2452         rmdata->rec    = rec;
2453         rmdata->count  = 0;
2454         rmdata->pnn    = pnn;
2455         rmdata->status = MONITOR_OK;
2456
2457         /* loop over all active nodes and send an async getrecmaster call to 
2458            them*/
2459         for (j=0; j<nodemap->num; j++) {
2460                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2461                         continue;
2462                 }
2463                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2464                                         CONTROL_TIMEOUT(),
2465                                         nodemap->nodes[j].pnn);
2466                 if (state == NULL) {
2467                         /* we failed to send the control, treat this as 
2468                            an error and try again next iteration
2469                         */                      
2470                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2471                         talloc_free(mem_ctx);
2472                         return MONITOR_FAILED;
2473                 }
2474
2475                 /* set up the callback functions */
2476                 state->async.fn = verify_recmaster_callback;
2477                 state->async.private_data = rmdata;
2478
2479                 /* one more control to wait for to complete */
2480                 rmdata->count++;
2481         }
2482
2483
2484         /* now wait for up to the maximum number of seconds allowed
2485            or until all nodes we expect a response from has replied
2486         */
2487         while (rmdata->count > 0) {
2488                 event_loop_once(ctdb->ev);
2489         }
2490
2491         status = rmdata->status;
2492         talloc_free(mem_ctx);
2493         return status;
2494 }
2495
2496
2497 /* called to check that the local allocation of public ip addresses is ok.
2498 */
2499 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2500 {
2501         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2502         struct ctdb_control_get_ifaces *ifaces = NULL;
2503         struct ctdb_all_public_ips *ips = NULL;
2504         struct ctdb_uptime *uptime1 = NULL;
2505         struct ctdb_uptime *uptime2 = NULL;
2506         int ret, j;
2507         bool need_iface_check = false;
2508         bool need_takeover_run = false;
2509
2510         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2511                                 CTDB_CURRENT_NODE, &uptime1);
2512         if (ret != 0) {
2513                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2514                 talloc_free(mem_ctx);
2515                 return -1;
2516         }
2517
2518
2519         /* read the interfaces from the local node */
2520         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2521         if (ret != 0) {
2522                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2523                 talloc_free(mem_ctx);
2524                 return -1;
2525         }
2526
2527         if (!rec->ifaces) {
2528                 need_iface_check = true;
2529         } else if (rec->ifaces->num != ifaces->num) {
2530                 need_iface_check = true;
2531         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2532                 need_iface_check = true;
2533         }
2534
2535         if (need_iface_check) {
2536                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2537                                      "local node %u - force takeover run\n",
2538                                      pnn));
2539                 need_takeover_run = true;
2540         }
2541
2542         /* read the ip allocation from the local node */
2543         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2544         if (ret != 0) {
2545                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2546                 talloc_free(mem_ctx);
2547                 return -1;
2548         }
2549
2550         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2551                                 CTDB_CURRENT_NODE, &uptime2);
2552         if (ret != 0) {
2553                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2554                 talloc_free(mem_ctx);
2555                 return -1;
2556         }
2557
2558         /* skip the check if the startrecovery time has changed */
2559         if (timeval_compare(&uptime1->last_recovery_started,
2560                             &uptime2->last_recovery_started) != 0) {
2561                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2562                 talloc_free(mem_ctx);
2563                 return 0;
2564         }
2565
2566         /* skip the check if the endrecovery time has changed */
2567         if (timeval_compare(&uptime1->last_recovery_finished,
2568                             &uptime2->last_recovery_finished) != 0) {
2569                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2570                 talloc_free(mem_ctx);
2571                 return 0;
2572         }
2573
2574         /* skip the check if we have started but not finished recovery */
2575         if (timeval_compare(&uptime1->last_recovery_finished,
2576                             &uptime1->last_recovery_started) != 1) {
2577                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2578                 talloc_free(mem_ctx);
2579
2580                 return 0;
2581         }
2582
2583         talloc_free(rec->ifaces);
2584         rec->ifaces = talloc_steal(rec, ifaces);
2585
2586         /* verify that we have the ip addresses we should have
2587            and we dont have ones we shouldnt have.
2588            if we find an inconsistency we set recmode to
2589            active on the local node and wait for the recmaster
2590            to do a full blown recovery.
2591            also if the pnn is -1 and we are healthy and can host the ip
2592            we also request a ip reallocation.
2593         */
2594         if (ctdb->tunable.disable_ip_failover == 0) {
2595                 for (j=0; j<ips->num; j++) {
2596                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2597                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2598                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2599                                 need_takeover_run = true;
2600                         } else if (ips->ips[j].pnn == pnn) {
2601                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2602                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2603                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2604                                         need_takeover_run = true;
2605                                 }
2606                         } else {
2607                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2608                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2609                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2610                                         need_takeover_run = true;
2611                                 }
2612                         }
2613                 }
2614         }
2615
2616         if (need_takeover_run) {
2617                 struct takeover_run_reply rd;
2618                 TDB_DATA data;
2619
2620                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2621
2622                 rd.pnn = ctdb->pnn;
2623                 rd.srvid = 0;
2624                 data.dptr = (uint8_t *)&rd;
2625                 data.dsize = sizeof(rd);
2626
2627                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2628                 if (ret != 0) {
2629                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2630                 }
2631         }
2632         talloc_free(mem_ctx);
2633         return 0;
2634 }
2635
2636
2637 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2638 {
2639         struct ctdb_node_map **remote_nodemaps = callback_data;
2640
2641         if (node_pnn >= ctdb->num_nodes) {
2642                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2643                 return;
2644         }
2645
2646         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2647
2648 }
2649
2650 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2651         struct ctdb_node_map *nodemap,
2652         struct ctdb_node_map **remote_nodemaps)
2653 {
2654         uint32_t *nodes;
2655
2656         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2657         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2658                                         nodes, 0,
2659                                         CONTROL_TIMEOUT(), false, tdb_null,
2660                                         async_getnodemap_callback,
2661                                         NULL,
2662                                         remote_nodemaps) != 0) {
2663                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2664
2665                 return -1;
2666         }
2667
2668         return 0;
2669 }
2670
2671 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2672 struct ctdb_check_reclock_state {
2673         struct ctdb_context *ctdb;
2674         struct timeval start_time;
2675         int fd[2];
2676         pid_t child;
2677         struct timed_event *te;
2678         struct fd_event *fde;
2679         enum reclock_child_status status;
2680 };
2681
2682 /* when we free the reclock state we must kill any child process.
2683 */
2684 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2685 {
2686         struct ctdb_context *ctdb = state->ctdb;
2687
2688         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2689
2690         if (state->fd[0] != -1) {
2691                 close(state->fd[0]);
2692                 state->fd[0] = -1;
2693         }
2694         if (state->fd[1] != -1) {
2695                 close(state->fd[1]);
2696                 state->fd[1] = -1;
2697         }
2698         kill(state->child, SIGKILL);
2699         return 0;
2700 }
2701
2702 /*
2703   called if our check_reclock child times out. this would happen if
2704   i/o to the reclock file blocks.
2705  */
2706 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2707                                          struct timeval t, void *private_data)
2708 {
2709         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2710                                            struct ctdb_check_reclock_state);
2711
2712         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2713         state->status = RECLOCK_TIMEOUT;
2714 }
2715
2716 /* this is called when the child process has completed checking the reclock
2717    file and has written data back to us through the pipe.
2718 */
2719 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2720                              uint16_t flags, void *private_data)
2721 {
2722         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2723                                              struct ctdb_check_reclock_state);
2724         char c = 0;
2725         int ret;
2726
2727         /* we got a response from our child process so we can abort the
2728            timeout.
2729         */
2730         talloc_free(state->te);
2731         state->te = NULL;
2732
2733         ret = read(state->fd[0], &c, 1);
2734         if (ret != 1 || c != RECLOCK_OK) {
2735                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2736                 state->status = RECLOCK_FAILED;
2737
2738                 return;
2739         }
2740
2741         state->status = RECLOCK_OK;
2742         return;
2743 }
2744
2745 static int check_recovery_lock(struct ctdb_context *ctdb)
2746 {
2747         int ret;
2748         struct ctdb_check_reclock_state *state;
2749         pid_t parent = getpid();
2750
2751         if (ctdb->recovery_lock_fd == -1) {
2752                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2753                 return -1;
2754         }
2755
2756         state = talloc(ctdb, struct ctdb_check_reclock_state);
2757         CTDB_NO_MEMORY(ctdb, state);
2758
2759         state->ctdb = ctdb;
2760         state->start_time = timeval_current();
2761         state->status = RECLOCK_CHECKING;
2762         state->fd[0] = -1;
2763         state->fd[1] = -1;
2764
2765         ret = pipe(state->fd);
2766         if (ret != 0) {
2767                 talloc_free(state);
2768                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2769                 return -1;
2770         }
2771
2772         state->child = ctdb_fork(ctdb);
2773         if (state->child == (pid_t)-1) {
2774                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2775                 close(state->fd[0]);
2776                 state->fd[0] = -1;
2777                 close(state->fd[1]);
2778                 state->fd[1] = -1;
2779                 talloc_free(state);
2780                 return -1;
2781         }
2782
2783         if (state->child == 0) {
2784                 char cc = RECLOCK_OK;
2785                 close(state->fd[0]);
2786                 state->fd[0] = -1;
2787
2788                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2789                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2790                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2791                         cc = RECLOCK_FAILED;
2792                 }
2793
2794                 write(state->fd[1], &cc, 1);
2795                 /* make sure we die when our parent dies */
2796                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2797                         sleep(5);
2798                         write(state->fd[1], &cc, 1);
2799                 }
2800                 _exit(0);
2801         }
2802         close(state->fd[1]);
2803         state->fd[1] = -1;
2804         set_close_on_exec(state->fd[0]);
2805
2806         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2807
2808         talloc_set_destructor(state, check_reclock_destructor);
2809
2810         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2811                                     ctdb_check_reclock_timeout, state);
2812         if (state->te == NULL) {
2813                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2814                 talloc_free(state);
2815                 return -1;
2816         }
2817
2818         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2819                                 EVENT_FD_READ,
2820                                 reclock_child_handler,
2821                                 (void *)state);
2822
2823         if (state->fde == NULL) {
2824                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2825                 talloc_free(state);
2826                 return -1;
2827         }
2828         tevent_fd_set_auto_close(state->fde);
2829
2830         while (state->status == RECLOCK_CHECKING) {
2831                 event_loop_once(ctdb->ev);
2832         }
2833
2834         if (state->status == RECLOCK_FAILED) {
2835                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2836                 close(ctdb->recovery_lock_fd);
2837                 ctdb->recovery_lock_fd = -1;
2838                 talloc_free(state);
2839                 return -1;
2840         }
2841
2842         talloc_free(state);
2843         return 0;
2844 }
2845
2846 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2847 {
2848         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2849         const char *reclockfile;
2850
2851         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2852                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2853                 talloc_free(tmp_ctx);
2854                 return -1;      
2855         }
2856
2857         if (reclockfile == NULL) {
2858                 if (ctdb->recovery_lock_file != NULL) {
2859                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2860                         talloc_free(ctdb->recovery_lock_file);
2861                         ctdb->recovery_lock_file = NULL;
2862                         if (ctdb->recovery_lock_fd != -1) {
2863                                 close(ctdb->recovery_lock_fd);
2864                                 ctdb->recovery_lock_fd = -1;
2865                         }
2866                 }
2867                 ctdb->tunable.verify_recovery_lock = 0;
2868                 talloc_free(tmp_ctx);
2869                 return 0;
2870         }
2871
2872         if (ctdb->recovery_lock_file == NULL) {
2873                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2874                 if (ctdb->recovery_lock_fd != -1) {
2875                         close(ctdb->recovery_lock_fd);
2876                         ctdb->recovery_lock_fd = -1;
2877                 }
2878                 talloc_free(tmp_ctx);
2879                 return 0;
2880         }
2881
2882
2883         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2884                 talloc_free(tmp_ctx);
2885                 return 0;
2886         }
2887
2888         talloc_free(ctdb->recovery_lock_file);
2889         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2890         ctdb->tunable.verify_recovery_lock = 0;
2891         if (ctdb->recovery_lock_fd != -1) {
2892                 close(ctdb->recovery_lock_fd);
2893                 ctdb->recovery_lock_fd = -1;
2894         }
2895
2896         talloc_free(tmp_ctx);
2897         return 0;
2898 }
2899
2900 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2901                       TALLOC_CTX *mem_ctx)
2902 {
2903         uint32_t pnn;
2904         struct ctdb_node_map *nodemap=NULL;
2905         struct ctdb_node_map *recmaster_nodemap=NULL;
2906         struct ctdb_node_map **remote_nodemaps=NULL;
2907         struct ctdb_vnn_map *vnnmap=NULL;
2908         struct ctdb_vnn_map *remote_vnnmap=NULL;
2909         int32_t debug_level;
2910         int i, j, ret;
2911
2912
2913
2914         /* verify that the main daemon is still running */
2915         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2916                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2917                 exit(-1);
2918         }
2919
2920         /* ping the local daemon to tell it we are alive */
2921         ctdb_ctrl_recd_ping(ctdb);
2922
2923         if (rec->election_timeout) {
2924                 /* an election is in progress */
2925                 return;
2926         }
2927
2928         /* read the debug level from the parent and update locally */
2929         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2930         if (ret !=0) {
2931                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2932                 return;
2933         }
2934         LogLevel = debug_level;
2935
2936
2937         /* We must check if we need to ban a node here but we want to do this
2938            as early as possible so we dont wait until we have pulled the node
2939            map from the local node. thats why we have the hardcoded value 20
2940         */
2941         for (i=0; i<ctdb->num_nodes; i++) {
2942                 struct ctdb_banning_state *ban_state;
2943
2944                 if (ctdb->nodes[i]->ban_state == NULL) {
2945                         continue;
2946                 }
2947                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2948                 if (ban_state->count < 20) {
2949                         continue;
2950                 }
2951                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2952                         ctdb->nodes[i]->pnn, ban_state->count,
2953                         ctdb->tunable.recovery_ban_period));
2954                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2955                 ban_state->count = 0;
2956         }
2957
2958         /* get relevant tunables */
2959         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2960         if (ret != 0) {
2961                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2962                 return;
2963         }
2964
2965         /* get the current recovery lock file from the server */
2966         if (update_recovery_lock_file(ctdb) != 0) {
2967                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2968                 return;
2969         }
2970
2971         /* Make sure that if recovery lock verification becomes disabled when
2972            we close the file
2973         */
2974         if (ctdb->tunable.verify_recovery_lock == 0) {
2975                 if (ctdb->recovery_lock_fd != -1) {
2976                         close(ctdb->recovery_lock_fd);
2977                         ctdb->recovery_lock_fd = -1;
2978                 }
2979         }
2980
2981         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2982         if (pnn == (uint32_t)-1) {
2983                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2984                 return;
2985         }
2986
2987         /* get the vnnmap */
2988         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2989         if (ret != 0) {
2990                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2991                 return;
2992         }
2993
2994
2995         /* get number of nodes */
2996         if (rec->nodemap) {
2997                 talloc_free(rec->nodemap);
2998                 rec->nodemap = NULL;
2999                 nodemap=NULL;
3000         }
3001         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3002         if (ret != 0) {
3003                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3004                 return;
3005         }
3006         nodemap = rec->nodemap;
3007
3008         /* check which node is the recovery master */
3009         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3010         if (ret != 0) {
3011                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3012                 return;
3013         }
3014
3015         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3016         if (rec->recmaster != pnn) {
3017                 if (rec->ip_reallocate_ctx != NULL) {
3018                         talloc_free(rec->ip_reallocate_ctx);
3019                         rec->ip_reallocate_ctx = NULL;
3020                         rec->reallocate_callers = NULL;
3021                 }
3022         }
3023         /* if there are takeovers requested, perform it and notify the waiters */
3024         if (rec->reallocate_callers) {
3025                 process_ipreallocate_requests(ctdb, rec);
3026         }
3027
3028         if (rec->recmaster == (uint32_t)-1) {
3029                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3030                 force_election(rec, pnn, nodemap);
3031                 return;
3032         }
3033
3034
3035         /* if the local daemon is STOPPED, we verify that the databases are
3036            also frozen and thet the recmode is set to active 
3037         */
3038         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3039                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3040                 if (ret != 0) {
3041                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3042                 }
3043                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3044                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3045
3046                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3047                         if (ret != 0) {
3048                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3049                                 return;
3050                         }
3051                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3052                         if (ret != 0) {
3053                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3054
3055                                 return;
3056                         }
3057                         return;
3058                 }
3059         }
3060         /* If the local node is stopped, verify we are not the recmaster 
3061            and yield this role if so
3062         */
3063         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3064                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3065                 force_election(rec, pnn, nodemap);
3066                 return;
3067         }
3068         
3069         /* check that we (recovery daemon) and the local ctdb daemon
3070            agrees on whether we are banned or not
3071         */
3072 //qqq
3073
3074         /* remember our own node flags */
3075         rec->node_flags = nodemap->nodes[pnn].flags;
3076
3077         /* count how many active nodes there are */
3078         rec->num_active    = 0;
3079         rec->num_connected = 0;
3080         for (i=0; i<nodemap->num; i++) {
3081                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3082                         rec->num_active++;
3083                 }
3084                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3085                         rec->num_connected++;
3086                 }
3087         }
3088
3089
3090         /* verify that the recmaster node is still active */
3091         for (j=0; j<nodemap->num; j++) {
3092                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3093                         break;
3094                 }
3095         }
3096
3097         if (j == nodemap->num) {
3098                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3099                 force_election(rec, pnn, nodemap);
3100                 return;
3101         }
3102
3103         /* if recovery master is disconnected we must elect a new recmaster */
3104         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3105                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3106                 force_election(rec, pnn, nodemap);
3107                 return;
3108         }
3109
3110         /* grap the nodemap from the recovery master to check if it is banned */
3111         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3112                                    mem_ctx, &recmaster_nodemap);
3113         if (ret != 0) {
3114                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3115                           nodemap->nodes[j].pnn));
3116                 return;
3117         }
3118
3119
3120         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3121                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3122                 force_election(rec, pnn, nodemap);
3123                 return;
3124         }
3125
3126
3127         /* verify that we have all ip addresses we should have and we dont
3128          * have addresses we shouldnt have.
3129          */ 
3130         if (ctdb->tunable.disable_ip_failover == 0) {
3131                 if (rec->ip_check_disable_ctx == NULL) {
3132                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3133                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3134                         }
3135                 }
3136         }
3137
3138
3139         /* if we are not the recmaster then we do not need to check
3140            if recovery is needed
3141          */
3142         if (pnn != rec->recmaster) {
3143                 return;
3144         }
3145
3146
3147         /* ensure our local copies of flags are right */
3148         ret = update_local_flags(rec, nodemap);
3149         if (ret == MONITOR_ELECTION_NEEDED) {
3150                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3151                 force_election(rec, pnn, nodemap);
3152                 return;
3153         }
3154         if (ret != MONITOR_OK) {
3155                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3156                 return;
3157         }
3158
3159         if (ctdb->num_nodes != nodemap->num) {
3160                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3161                 reload_nodes_file(ctdb);
3162                 return;
3163         }
3164
3165         /* verify that all active nodes agree that we are the recmaster */
3166         switch (verify_recmaster(rec, nodemap, pnn)) {
3167         case MONITOR_RECOVERY_NEEDED:
3168                 /* can not happen */
3169                 return;
3170         case MONITOR_ELECTION_NEEDED:
3171                 force_election(rec, pnn, nodemap);
3172                 return;
3173         case MONITOR_OK:
3174                 break;
3175         case MONITOR_FAILED:
3176                 return;
3177         }
3178
3179
3180         if (rec->need_recovery) {
3181                 /* a previous recovery didn't finish */
3182                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3183                 return;
3184         }
3185
3186         /* verify that all active nodes are in normal mode 
3187            and not in recovery mode 
3188         */
3189         switch (verify_recmode(ctdb, nodemap)) {
3190         case MONITOR_RECOVERY_NEEDED:
3191                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3192                 return;
3193         case MONITOR_FAILED:
3194                 return;
3195         case MONITOR_ELECTION_NEEDED:
3196                 /* can not happen */
3197         case MONITOR_OK:
3198                 break;
3199         }
3200
3201
3202         if (ctdb->tunable.verify_recovery_lock != 0) {
3203                 /* we should have the reclock - check its not stale */
3204                 ret = check_recovery_lock(ctdb);
3205                 if (ret != 0) {
3206                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3207                         ctdb_set_culprit(rec, ctdb->pnn);
3208                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3209                         return;
3210                 }
3211         }
3212
3213         /* get the nodemap for all active remote nodes
3214          */
3215         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3216         if (remote_nodemaps == NULL) {
3217                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3218                 return;
3219         }
3220         for(i=0; i<nodemap->num; i++) {
3221                 remote_nodemaps[i] = NULL;
3222         }
3223         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3224                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3225                 return;
3226         } 
3227
3228         /* verify that all other nodes have the same nodemap as we have
3229         */
3230         for (j=0; j<nodemap->num; j++) {
3231                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3232                         continue;
3233                 }
3234
3235                 if (remote_nodemaps[j] == NULL) {
3236                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3237                         ctdb_set_culprit(rec, j);
3238
3239                         return;
3240                 }
3241
3242                 /* if the nodes disagree on how many nodes there are
3243                    then this is a good reason to try recovery
3244                  */
3245                 if (remote_nodemaps[j]->num != nodemap->num) {
3246                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3247                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3248                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3249                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3250                         return;
3251                 }
3252
3253                 /* if the nodes disagree on which nodes exist and are
3254                    active, then that is also a good reason to do recovery
3255                  */
3256                 for (i=0;i<nodemap->num;i++) {
3257                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3258                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3259                                           nodemap->nodes[j].pnn, i, 
3260                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3261                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3262                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3263                                             vnnmap);
3264                                 return;
3265                         }
3266                 }
3267
3268                 /* verify the flags are consistent
3269                 */
3270                 for (i=0; i<nodemap->num; i++) {
3271                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3272                                 continue;
3273                         }
3274                         
3275                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3276                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3277                                   nodemap->nodes[j].pnn, 
3278                                   nodemap->nodes[i].pnn, 
3279                                   remote_nodemaps[j]->nodes[i].flags,
3280                                   nodemap->nodes[j].flags));
3281                                 if (i == j) {
3282                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3283                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3284                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3285                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3286                                                     vnnmap);
3287                                         return;
3288                                 } else {
3289                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3290                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3291                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3292                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3293                                                     vnnmap);
3294                                         return;
3295                                 }
3296                         }
3297                 }
3298         }
3299
3300
3301         /* there better be the same number of lmasters in the vnn map
3302            as there are active nodes or we will have to do a recovery
3303          */
3304         if (vnnmap->size != rec->num_active) {
3305                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3306                           vnnmap->size, rec->num_active));
3307                 ctdb_set_culprit(rec, ctdb->pnn);
3308                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3309                 return;
3310         }
3311
3312         /* verify that all active nodes in the nodemap also exist in 
3313            the vnnmap.
3314          */
3315         for (j=0; j<nodemap->num; j++) {
3316                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3317                         continue;
3318                 }
3319                 if (nodemap->nodes[j].pnn == pnn) {
3320                         continue;
3321                 }
3322
3323                 for (i=0; i<vnnmap->size; i++) {
3324                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3325                                 break;
3326                         }
3327                 }
3328                 if (i == vnnmap->size) {
3329                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3330                                   nodemap->nodes[j].pnn));
3331                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3332                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3333                         return;
3334                 }
3335         }
3336
3337         
3338         /* verify that all other nodes have the same vnnmap
3339            and are from the same generation
3340          */
3341         for (j=0; j<nodemap->num; j++) {
3342                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3343                         continue;
3344                 }
3345                 if (nodemap->nodes[j].pnn == pnn) {
3346                         continue;
3347                 }
3348
3349                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3350                                           mem_ctx, &remote_vnnmap);
3351                 if (ret != 0) {
3352                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3353                                   nodemap->nodes[j].pnn));
3354                         return;
3355                 }
3356
3357                 /* verify the vnnmap generation is the same */
3358                 if (vnnmap->generation != remote_vnnmap->generation) {
3359                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3360                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3361                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3362                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3363                         return;
3364                 }
3365
3366                 /* verify the vnnmap size is the same */
3367                 if (vnnmap->size != remote_vnnmap->size) {
3368                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3369                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3370                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3371                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3372                         return;
3373                 }
3374
3375                 /* verify the vnnmap is the same */
3376                 for (i=0;i<vnnmap->size;i++) {
3377                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3378                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3379                                           nodemap->nodes[j].pnn));
3380                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3381                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3382                                             vnnmap);
3383                                 return;
3384                         }
3385                 }
3386         }
3387
3388         /* we might need to change who has what IP assigned */
3389         if (rec->need_takeover_run) {
3390                 uint32_t culprit = (uint32_t)-1;
3391
3392                 rec->need_takeover_run = false;
3393
3394                 /* update the list of public ips that a node can handle for
3395                    all connected nodes
3396                 */
3397                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3398                 if (ret != 0) {
3399                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3400                                          culprit));
3401                         rec->need_takeover_run = true;
3402                         return;
3403                 }
3404
3405                 /* execute the "startrecovery" event script on all nodes */
3406                 ret = run_startrecovery_eventscript(rec, nodemap);
3407                 if (ret!=0) {
3408                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3409                         ctdb_set_culprit(rec, ctdb->pnn);
3410                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3411                         return;
3412                 }
3413
3414                 ret = ctdb_takeover_run(ctdb, nodemap);
3415                 if (ret != 0) {
3416                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3417                         return;
3418                 }
3419
3420                 /* execute the "recovered" event script on all nodes */
3421                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3422 #if 0
3423 // we cant check whether the event completed successfully
3424 // since this script WILL fail if the node is in recovery mode
3425 // and if that race happens, the code here would just cause a second
3426 // cascading recovery.
3427                 if (ret!=0) {
3428                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3429                         ctdb_set_culprit(rec, ctdb->pnn);
3430                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3431                 }
3432 #endif
3433         }
3434 }
3435
3436 /*
3437   the main monitoring loop
3438  */
3439 static void monitor_cluster(struct ctdb_context *ctdb)
3440 {
3441         struct ctdb_recoverd *rec;
3442
3443         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3444
3445         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3446         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3447
3448         rec->ctdb = ctdb;
3449
3450         rec->priority_time = timeval_current();
3451
3452         /* register a message port for sending memory dumps */
3453         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3454
3455         /* register a message port for recovery elections */
3456         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3457
3458         /* when nodes are disabled/enabled */
3459         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3460
3461         /* when we are asked to puch out a flag change */
3462         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3463
3464         /* register a message port for vacuum fetch */
3465         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3466
3467         /* register a message port for reloadnodes  */
3468         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3469
3470         /* register a message port for performing a takeover run */
3471         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3472
3473         /* register a message port for disabling the ip check for a short while */
3474         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3475
3476         /* register a message port for updating the recovery daemons node assignment for an ip */
3477         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3478
3479         for (;;) {
3480                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3481                 struct timeval start;
3482                 double elapsed;
3483
3484                 if (!mem_ctx) {
3485                         DEBUG(DEBUG_CRIT,(__location__
3486                                           " Failed to create temp context\n"));
3487                         exit(-1);
3488                 }
3489
3490                 start = timeval_current();
3491                 main_loop(ctdb, rec, mem_ctx);
3492                 talloc_free(mem_ctx);
3493
3494                 /* we only check for recovery once every second */
3495                 elapsed = timeval_elapsed(&start);
3496                 if (elapsed < ctdb->tunable.recover_interval) {
3497                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3498                                           - elapsed);
3499                 }
3500         }
3501 }
3502
3503 /*
3504   event handler for when the main ctdbd dies
3505  */
3506 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3507                                  uint16_t flags, void *private_data)
3508 {
3509         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3510         _exit(1);
3511 }
3512
3513 /*
3514   called regularly to verify that the recovery daemon is still running
3515  */
3516 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3517                               struct timeval yt, void *p)
3518 {
3519         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3520
3521         if (kill(ctdb->recoverd_pid, 0) != 0) {
3522                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3523
3524                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3525                                 ctdb_restart_recd, ctdb);
3526
3527                 return;
3528         }
3529
3530         event_add_timed(ctdb->ev, ctdb, 
3531                         timeval_current_ofs(30, 0),
3532                         ctdb_check_recd, ctdb);
3533 }
3534
3535 static void recd_sig_child_handler(struct event_context *ev,
3536         struct signal_event *se, int signum, int count,
3537         void *dont_care, 
3538         void *private_data)
3539 {
3540 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3541         int status;
3542         pid_t pid = -1;
3543
3544         while (pid != 0) {
3545                 pid = waitpid(-1, &status, WNOHANG);
3546                 if (pid == -1) {
3547                         if (errno != ECHILD) {
3548                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3549                         }
3550                         return;
3551                 }
3552                 if (pid > 0) {
3553                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3554                 }
3555         }
3556 }
3557
3558 /*
3559   startup the recovery daemon as a child of the main ctdb daemon
3560  */
3561 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3562 {
3563         int fd[2];
3564         struct signal_event *se;
3565         struct tevent_fd *fde;
3566
3567         if (pipe(fd) != 0) {
3568                 return -1;
3569         }
3570
3571         ctdb->ctdbd_pid = getpid();
3572
3573         ctdb->recoverd_pid = fork();
3574         if (ctdb->recoverd_pid == -1) {
3575                 return -1;
3576         }
3577         
3578         if (ctdb->recoverd_pid != 0) {
3579                 close(fd[0]);
3580                 event_add_timed(ctdb->ev, ctdb, 
3581                                 timeval_current_ofs(30, 0),
3582                                 ctdb_check_recd, ctdb);
3583                 return 0;
3584         }
3585
3586         close(fd[1]);
3587
3588         srandom(getpid() ^ time(NULL));
3589
3590         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3591                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3592                 exit(1);
3593         }
3594
3595         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3596
3597         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3598                      ctdb_recoverd_parent, &fd[0]);     
3599         tevent_fd_set_auto_close(fde);
3600
3601         /* set up a handler to pick up sigchld */
3602         se = event_add_signal(ctdb->ev, ctdb,
3603                                      SIGCHLD, 0,
3604                                      recd_sig_child_handler,
3605                                      ctdb);
3606         if (se == NULL) {
3607                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3608                 exit(1);
3609         }
3610
3611         monitor_cluster(ctdb);
3612
3613         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3614         return -1;
3615 }
3616
3617 /*
3618   shutdown the recovery daemon
3619  */
3620 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3621 {
3622         if (ctdb->recoverd_pid == 0) {
3623                 return;
3624         }
3625
3626         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3627         kill(ctdb->recoverd_pid, SIGTERM);
3628 }
3629
3630 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3631                        struct timeval t, void *private_data)
3632 {
3633         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3634
3635         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3636         ctdb_stop_recoverd(ctdb);
3637         ctdb_start_recoverd(ctdb);
3638 }