rename ctdb.h to ctdb_protocol.h
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "include/ctdb_protocol.h"
29 #include "include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
891 {
892         uint32_t timed_out = 0;
893         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
894         while (!timed_out) {
895                 event_loop_once(ctdb->ev);
896         }
897 }
898
899 /*
900   called when an election times out (ends)
901  */
902 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
903                                   struct timeval t, void *p)
904 {
905         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
906         rec->election_timeout = NULL;
907
908         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
909 }
910
911
912 /*
913   wait for an election to finish. It finished election_timeout seconds after
914   the last election packet is received
915  */
916 static void ctdb_wait_election(struct ctdb_recoverd *rec)
917 {
918         struct ctdb_context *ctdb = rec->ctdb;
919         while (rec->election_timeout) {
920                 event_loop_once(ctdb->ev);
921         }
922 }
923
924 /*
925   Update our local flags from all remote connected nodes. 
926   This is only run when we are or we belive we are the recovery master
927  */
928 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
929 {
930         int j;
931         struct ctdb_context *ctdb = rec->ctdb;
932         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
933
934         /* get the nodemap for all active remote nodes and verify
935            they are the same as for this node
936          */
937         for (j=0; j<nodemap->num; j++) {
938                 struct ctdb_node_map *remote_nodemap=NULL;
939                 int ret;
940
941                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
942                         continue;
943                 }
944                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
945                         continue;
946                 }
947
948                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
949                                            mem_ctx, &remote_nodemap);
950                 if (ret != 0) {
951                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
952                                   nodemap->nodes[j].pnn));
953                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
954                         talloc_free(mem_ctx);
955                         return MONITOR_FAILED;
956                 }
957                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
958                         /* We should tell our daemon about this so it
959                            updates its flags or else we will log the same 
960                            message again in the next iteration of recovery.
961                            Since we are the recovery master we can just as
962                            well update the flags on all nodes.
963                         */
964                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
965                         if (ret != 0) {
966                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
967                                 return -1;
968                         }
969
970                         /* Update our local copy of the flags in the recovery
971                            daemon.
972                         */
973                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
974                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
975                                  nodemap->nodes[j].flags));
976                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
977                 }
978                 talloc_free(remote_nodemap);
979         }
980         talloc_free(mem_ctx);
981         return MONITOR_OK;
982 }
983
984
985 /* Create a new random generation ip. 
986    The generation id can not be the INVALID_GENERATION id
987 */
988 static uint32_t new_generation(void)
989 {
990         uint32_t generation;
991
992         while (1) {
993                 generation = random();
994
995                 if (generation != INVALID_GENERATION) {
996                         break;
997                 }
998         }
999
1000         return generation;
1001 }
1002
1003
1004 /*
1005   create a temporary working database
1006  */
1007 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1008 {
1009         char *name;
1010         struct tdb_wrap *recdb;
1011         unsigned tdb_flags;
1012
1013         /* open up the temporary recovery database */
1014         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1015                                ctdb->db_directory_state,
1016                                ctdb->pnn);
1017         if (name == NULL) {
1018                 return NULL;
1019         }
1020         unlink(name);
1021
1022         tdb_flags = TDB_NOLOCK;
1023         if (ctdb->valgrinding) {
1024                 tdb_flags |= TDB_NOMMAP;
1025         }
1026         tdb_flags |= TDB_DISALLOW_NESTING;
1027
1028         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1029                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1030         if (recdb == NULL) {
1031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1032         }
1033
1034         talloc_free(name);
1035
1036         return recdb;
1037 }
1038
1039
1040 /* 
1041    a traverse function for pulling all relevent records from recdb
1042  */
1043 struct recdb_data {
1044         struct ctdb_context *ctdb;
1045         struct ctdb_marshall_buffer *recdata;
1046         uint32_t len;
1047         bool failed;
1048         bool persistent;
1049 };
1050
1051 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1052 {
1053         struct recdb_data *params = (struct recdb_data *)p;
1054         struct ctdb_rec_data *rec;
1055         struct ctdb_ltdb_header *hdr;
1056
1057         /* skip empty records */
1058         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1059                 return 0;
1060         }
1061
1062         /* update the dmaster field to point to us */
1063         hdr = (struct ctdb_ltdb_header *)data.dptr;
1064         if (!params->persistent) {
1065                 hdr->dmaster = params->ctdb->pnn;
1066         }
1067
1068         /* add the record to the blob ready to send to the nodes */
1069         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1070         if (rec == NULL) {
1071                 params->failed = true;
1072                 return -1;
1073         }
1074         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1075         if (params->recdata == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1077                          rec->length + params->len, params->recdata->count));
1078                 params->failed = true;
1079                 return -1;
1080         }
1081         params->recdata->count++;
1082         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1083         params->len += rec->length;
1084         talloc_free(rec);
1085
1086         return 0;
1087 }
1088
1089 /*
1090   push the recdb database out to all nodes
1091  */
1092 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1093                                bool persistent,
1094                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1095 {
1096         struct recdb_data params;
1097         struct ctdb_marshall_buffer *recdata;
1098         TDB_DATA outdata;
1099         TALLOC_CTX *tmp_ctx;
1100         uint32_t *nodes;
1101
1102         tmp_ctx = talloc_new(ctdb);
1103         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1104
1105         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1106         CTDB_NO_MEMORY(ctdb, recdata);
1107
1108         recdata->db_id = dbid;
1109
1110         params.ctdb = ctdb;
1111         params.recdata = recdata;
1112         params.len = offsetof(struct ctdb_marshall_buffer, data);
1113         params.failed = false;
1114         params.persistent = persistent;
1115
1116         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1118                 talloc_free(params.recdata);
1119                 talloc_free(tmp_ctx);
1120                 return -1;
1121         }
1122
1123         if (params.failed) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;              
1128         }
1129
1130         recdata = params.recdata;
1131
1132         outdata.dptr = (void *)recdata;
1133         outdata.dsize = params.len;
1134
1135         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1136         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1137                                         nodes, 0,
1138                                         CONTROL_TIMEOUT(), false, outdata,
1139                                         NULL, NULL,
1140                                         NULL) != 0) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             bool persistent,
1164                             uint32_t pnn, 
1165                             struct ctdb_node_map *nodemap,
1166                             uint32_t transaction_id)
1167 {
1168         struct tdb_wrap *recdb;
1169         int ret;
1170         struct ctdb_context *ctdb = rec->ctdb;
1171         TDB_DATA data;
1172         struct ctdb_control_wipe_database w;
1173         uint32_t *nodes;
1174
1175         recdb = create_recdb(ctdb, mem_ctx);
1176         if (recdb == NULL) {
1177                 return -1;
1178         }
1179
1180         /* pull all remote databases onto the recdb */
1181         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1184                 return -1;
1185         }
1186
1187         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1188
1189         /* wipe all the remote databases. This is safe as we are in a transaction */
1190         w.db_id = dbid;
1191         w.transaction_id = transaction_id;
1192
1193         data.dptr = (void *)&w;
1194         data.dsize = sizeof(w);
1195
1196         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1198                                         nodes, 0,
1199                                         CONTROL_TIMEOUT(), false, data,
1200                                         NULL, NULL,
1201                                         NULL) != 0) {
1202                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1203                 talloc_free(recdb);
1204                 return -1;
1205         }
1206         
1207         /* push out the correct database. This sets the dmaster and skips 
1208            the empty records */
1209         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1210         if (ret != 0) {
1211                 talloc_free(recdb);
1212                 return -1;
1213         }
1214
1215         /* all done with this database */
1216         talloc_free(recdb);
1217
1218         return 0;
1219 }
1220
1221 /*
1222   reload the nodes file 
1223 */
1224 static void reload_nodes_file(struct ctdb_context *ctdb)
1225 {
1226         ctdb->nodes = NULL;
1227         ctdb_load_nodes_file(ctdb);
1228 }
1229
1230 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1231                                          struct ctdb_recoverd *rec,
1232                                          struct ctdb_node_map *nodemap,
1233                                          uint32_t *culprit)
1234 {
1235         int j;
1236         int ret;
1237
1238         if (ctdb->num_nodes != nodemap->num) {
1239                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1240                                   ctdb->num_nodes, nodemap->num));
1241                 if (culprit) {
1242                         *culprit = ctdb->pnn;
1243                 }
1244                 return -1;
1245         }
1246
1247         for (j=0; j<nodemap->num; j++) {
1248                 /* release any existing data */
1249                 if (ctdb->nodes[j]->known_public_ips) {
1250                         talloc_free(ctdb->nodes[j]->known_public_ips);
1251                         ctdb->nodes[j]->known_public_ips = NULL;
1252                 }
1253                 if (ctdb->nodes[j]->available_public_ips) {
1254                         talloc_free(ctdb->nodes[j]->available_public_ips);
1255                         ctdb->nodes[j]->available_public_ips = NULL;
1256                 }
1257
1258                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1259                         continue;
1260                 }
1261
1262                 /* grab a new shiny list of public ips from the node */
1263                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1264                                         CONTROL_TIMEOUT(),
1265                                         ctdb->nodes[j]->pnn,
1266                                         ctdb->nodes,
1267                                         0,
1268                                         &ctdb->nodes[j]->known_public_ips);
1269                 if (ret != 0) {
1270                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1271                                 ctdb->nodes[j]->pnn));
1272                         if (culprit) {
1273                                 *culprit = ctdb->nodes[j]->pnn;
1274                         }
1275                         return -1;
1276                 }
1277
1278                 if (rec->ip_check_disable_ctx == NULL) {
1279                         if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1280                                 DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1281                                 rec->need_takeover_run = true;
1282                         }
1283                 }
1284
1285                 /* grab a new shiny list of public ips from the node */
1286                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1287                                         CONTROL_TIMEOUT(),
1288                                         ctdb->nodes[j]->pnn,
1289                                         ctdb->nodes,
1290                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1291                                         &ctdb->nodes[j]->available_public_ips);
1292                 if (ret != 0) {
1293                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1294                                 ctdb->nodes[j]->pnn));
1295                         if (culprit) {
1296                                 *culprit = ctdb->nodes[j]->pnn;
1297                         }
1298                         return -1;
1299                 }
1300         }
1301
1302         return 0;
1303 }
1304
1305 /* when we start a recovery, make sure all nodes use the same reclock file
1306    setting
1307 */
1308 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1309 {
1310         struct ctdb_context *ctdb = rec->ctdb;
1311         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1312         TDB_DATA data;
1313         uint32_t *nodes;
1314
1315         if (ctdb->recovery_lock_file == NULL) {
1316                 data.dptr  = NULL;
1317                 data.dsize = 0;
1318         } else {
1319                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1320                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1321         }
1322
1323         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1324         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1325                                         nodes, 0,
1326                                         CONTROL_TIMEOUT(),
1327                                         false, data,
1328                                         NULL, NULL,
1329                                         rec) != 0) {
1330                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1331                 talloc_free(tmp_ctx);
1332                 return -1;
1333         }
1334
1335         talloc_free(tmp_ctx);
1336         return 0;
1337 }
1338
1339
1340 /*
1341   we are the recmaster, and recovery is needed - start a recovery run
1342  */
1343 static int do_recovery(struct ctdb_recoverd *rec, 
1344                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1345                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1346 {
1347         struct ctdb_context *ctdb = rec->ctdb;
1348         int i, j, ret;
1349         uint32_t generation;
1350         struct ctdb_dbid_map *dbmap;
1351         TDB_DATA data;
1352         uint32_t *nodes;
1353         struct timeval start_time;
1354         uint32_t culprit = (uint32_t)-1;
1355
1356         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1357
1358         /* if recovery fails, force it again */
1359         rec->need_recovery = true;
1360
1361         for (i=0; i<ctdb->num_nodes; i++) {
1362                 struct ctdb_banning_state *ban_state;
1363
1364                 if (ctdb->nodes[i]->ban_state == NULL) {
1365                         continue;
1366                 }
1367                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1368                 if (ban_state->count < 2*ctdb->num_nodes) {
1369                         continue;
1370                 }
1371                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1372                         ctdb->nodes[i]->pnn, ban_state->count,
1373                         ctdb->tunable.recovery_ban_period));
1374                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1375                 ban_state->count = 0;
1376         }
1377
1378
1379         if (ctdb->tunable.verify_recovery_lock != 0) {
1380                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1381                 start_time = timeval_current();
1382                 if (!ctdb_recovery_lock(ctdb, true)) {
1383                         ctdb_set_culprit(rec, pnn);
1384                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1385                         return -1;
1386                 }
1387                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1388                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1389         }
1390
1391         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1392
1393         /* get a list of all databases */
1394         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1395         if (ret != 0) {
1396                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1397                 return -1;
1398         }
1399
1400         /* we do the db creation before we set the recovery mode, so the freeze happens
1401            on all databases we will be dealing with. */
1402
1403         /* verify that we have all the databases any other node has */
1404         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1407                 return -1;
1408         }
1409
1410         /* verify that all other nodes have all our databases */
1411         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1412         if (ret != 0) {
1413                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1414                 return -1;
1415         }
1416         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1417
1418         /* update the database priority for all remote databases */
1419         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1422         }
1423         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1424
1425
1426         /* update all other nodes to use the same setting for reclock files
1427            as the local recovery master.
1428         */
1429         sync_recovery_lock_file_across_cluster(rec);
1430
1431         /* set recovery mode to active on all nodes */
1432         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1433         if (ret != 0) {
1434                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1435                 return -1;
1436         }
1437
1438         /* execute the "startrecovery" event script on all nodes */
1439         ret = run_startrecovery_eventscript(rec, nodemap);
1440         if (ret!=0) {
1441                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1442                 return -1;
1443         }
1444
1445         /*
1446           update all nodes to have the same flags that we have
1447          */
1448         for (i=0;i<nodemap->num;i++) {
1449                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1450                         continue;
1451                 }
1452
1453                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1454                 if (ret != 0) {
1455                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1456                         return -1;
1457                 }
1458         }
1459
1460         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1461
1462         /* pick a new generation number */
1463         generation = new_generation();
1464
1465         /* change the vnnmap on this node to use the new generation 
1466            number but not on any other nodes.
1467            this guarantees that if we abort the recovery prematurely
1468            for some reason (a node stops responding?)
1469            that we can just return immediately and we will reenter
1470            recovery shortly again.
1471            I.e. we deliberately leave the cluster with an inconsistent
1472            generation id to allow us to abort recovery at any stage and
1473            just restart it from scratch.
1474          */
1475         vnnmap->generation = generation;
1476         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1477         if (ret != 0) {
1478                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1479                 return -1;
1480         }
1481
1482         data.dptr = (void *)&generation;
1483         data.dsize = sizeof(uint32_t);
1484
1485         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1486         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1487                                         nodes, 0,
1488                                         CONTROL_TIMEOUT(), false, data,
1489                                         NULL,
1490                                         transaction_start_fail_callback,
1491                                         rec) != 0) {
1492                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1493                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1494                                         nodes, 0,
1495                                         CONTROL_TIMEOUT(), false, tdb_null,
1496                                         NULL,
1497                                         NULL,
1498                                         NULL) != 0) {
1499                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1500                 }
1501                 return -1;
1502         }
1503
1504         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1505
1506         for (i=0;i<dbmap->num;i++) {
1507                 ret = recover_database(rec, mem_ctx,
1508                                        dbmap->dbs[i].dbid,
1509                                        dbmap->dbs[i].persistent,
1510                                        pnn, nodemap, generation);
1511                 if (ret != 0) {
1512                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1513                         return -1;
1514                 }
1515         }
1516
1517         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1518
1519         /* commit all the changes */
1520         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1521                                         nodes, 0,
1522                                         CONTROL_TIMEOUT(), false, data,
1523                                         NULL, NULL,
1524                                         NULL) != 0) {
1525                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1526                 return -1;
1527         }
1528
1529         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1530         
1531
1532         /* update the capabilities for all nodes */
1533         ret = update_capabilities(ctdb, nodemap);
1534         if (ret!=0) {
1535                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1536                 return -1;
1537         }
1538
1539         /* build a new vnn map with all the currently active and
1540            unbanned nodes */
1541         generation = new_generation();
1542         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1543         CTDB_NO_MEMORY(ctdb, vnnmap);
1544         vnnmap->generation = generation;
1545         vnnmap->size = 0;
1546         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1547         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1548         for (i=j=0;i<nodemap->num;i++) {
1549                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1550                         continue;
1551                 }
1552                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1553                         /* this node can not be an lmaster */
1554                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1555                         continue;
1556                 }
1557
1558                 vnnmap->size++;
1559                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1560                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1561                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1562
1563         }
1564         if (vnnmap->size == 0) {
1565                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1566                 vnnmap->size++;
1567                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1568                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1569                 vnnmap->map[0] = pnn;
1570         }       
1571
1572         /* update to the new vnnmap on all nodes */
1573         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1574         if (ret != 0) {
1575                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1576                 return -1;
1577         }
1578
1579         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1580
1581         /* update recmaster to point to us for all nodes */
1582         ret = set_recovery_master(ctdb, nodemap, pnn);
1583         if (ret!=0) {
1584                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1585                 return -1;
1586         }
1587
1588         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1589
1590         /*
1591           update all nodes to have the same flags that we have
1592          */
1593         for (i=0;i<nodemap->num;i++) {
1594                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1595                         continue;
1596                 }
1597
1598                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1599                 if (ret != 0) {
1600                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1601                         return -1;
1602                 }
1603         }
1604
1605         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1606
1607         /* disable recovery mode */
1608         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1609         if (ret != 0) {
1610                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1611                 return -1;
1612         }
1613
1614         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1615
1616         /*
1617           tell nodes to takeover their public IPs
1618          */
1619         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1620         if (ret != 0) {
1621                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1622                                  culprit));
1623                 return -1;
1624         }
1625         rec->need_takeover_run = false;
1626         ret = ctdb_takeover_run(ctdb, nodemap);
1627         if (ret != 0) {
1628                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1629                 return -1;
1630         }
1631         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1632
1633         /* execute the "recovered" event script on all nodes */
1634         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1635         if (ret!=0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1637                 return -1;
1638         }
1639
1640         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1641
1642         /* send a message to all clients telling them that the cluster 
1643            has been reconfigured */
1644         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1645
1646         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1647
1648         rec->need_recovery = false;
1649
1650         /* we managed to complete a full recovery, make sure to forgive
1651            any past sins by the nodes that could now participate in the
1652            recovery.
1653         */
1654         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1655         for (i=0;i<nodemap->num;i++) {
1656                 struct ctdb_banning_state *ban_state;
1657
1658                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1659                         continue;
1660                 }
1661
1662                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1663                 if (ban_state == NULL) {
1664                         continue;
1665                 }
1666
1667                 ban_state->count = 0;
1668         }
1669
1670
1671         /* We just finished a recovery successfully. 
1672            We now wait for rerecovery_timeout before we allow 
1673            another recovery to take place.
1674         */
1675         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1676         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1677         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1678
1679         return 0;
1680 }
1681
1682
1683 /*
1684   elections are won by first checking the number of connected nodes, then
1685   the priority time, then the pnn
1686  */
1687 struct election_message {
1688         uint32_t num_connected;
1689         struct timeval priority_time;
1690         uint32_t pnn;
1691         uint32_t node_flags;
1692 };
1693
1694 /*
1695   form this nodes election data
1696  */
1697 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1698 {
1699         int ret, i;
1700         struct ctdb_node_map *nodemap;
1701         struct ctdb_context *ctdb = rec->ctdb;
1702
1703         ZERO_STRUCTP(em);
1704
1705         em->pnn = rec->ctdb->pnn;
1706         em->priority_time = rec->priority_time;
1707
1708         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1709         if (ret != 0) {
1710                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1711                 return;
1712         }
1713
1714         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1715         em->node_flags = rec->node_flags;
1716
1717         for (i=0;i<nodemap->num;i++) {
1718                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1719                         em->num_connected++;
1720                 }
1721         }
1722
1723         /* we shouldnt try to win this election if we cant be a recmaster */
1724         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1725                 em->num_connected = 0;
1726                 em->priority_time = timeval_current();
1727         }
1728
1729         talloc_free(nodemap);
1730 }
1731
1732 /*
1733   see if the given election data wins
1734  */
1735 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1736 {
1737         struct election_message myem;
1738         int cmp = 0;
1739
1740         ctdb_election_data(rec, &myem);
1741
1742         /* we cant win if we dont have the recmaster capability */
1743         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1744                 return false;
1745         }
1746
1747         /* we cant win if we are banned */
1748         if (rec->node_flags & NODE_FLAGS_BANNED) {
1749                 return false;
1750         }       
1751
1752         /* we cant win if we are stopped */
1753         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1754                 return false;
1755         }       
1756
1757         /* we will automatically win if the other node is banned */
1758         if (em->node_flags & NODE_FLAGS_BANNED) {
1759                 return true;
1760         }
1761
1762         /* we will automatically win if the other node is banned */
1763         if (em->node_flags & NODE_FLAGS_STOPPED) {
1764                 return true;
1765         }
1766
1767         /* try to use the most connected node */
1768         if (cmp == 0) {
1769                 cmp = (int)myem.num_connected - (int)em->num_connected;
1770         }
1771
1772         /* then the longest running node */
1773         if (cmp == 0) {
1774                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1775         }
1776
1777         if (cmp == 0) {
1778                 cmp = (int)myem.pnn - (int)em->pnn;
1779         }
1780
1781         return cmp > 0;
1782 }
1783
1784 /*
1785   send out an election request
1786  */
1787 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1788 {
1789         int ret;
1790         TDB_DATA election_data;
1791         struct election_message emsg;
1792         uint64_t srvid;
1793         struct ctdb_context *ctdb = rec->ctdb;
1794
1795         srvid = CTDB_SRVID_RECOVERY;
1796
1797         ctdb_election_data(rec, &emsg);
1798
1799         election_data.dsize = sizeof(struct election_message);
1800         election_data.dptr  = (unsigned char *)&emsg;
1801
1802
1803         /* send an election message to all active nodes */
1804         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1805         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1806
1807
1808         /* A new node that is already frozen has entered the cluster.
1809            The existing nodes are not frozen and dont need to be frozen
1810            until the election has ended and we start the actual recovery
1811         */
1812         if (update_recmaster == true) {
1813                 /* first we assume we will win the election and set 
1814                    recoverymaster to be ourself on the current node
1815                  */
1816                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1817                 if (ret != 0) {
1818                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1819                         return -1;
1820                 }
1821         }
1822
1823
1824         return 0;
1825 }
1826
1827 /*
1828   this function will unban all nodes in the cluster
1829 */
1830 static void unban_all_nodes(struct ctdb_context *ctdb)
1831 {
1832         int ret, i;
1833         struct ctdb_node_map *nodemap;
1834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1835         
1836         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1837         if (ret != 0) {
1838                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1839                 return;
1840         }
1841
1842         for (i=0;i<nodemap->num;i++) {
1843                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1844                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1845                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1846                 }
1847         }
1848
1849         talloc_free(tmp_ctx);
1850 }
1851
1852
1853 /*
1854   we think we are winning the election - send a broadcast election request
1855  */
1856 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1857 {
1858         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1859         int ret;
1860
1861         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1862         if (ret != 0) {
1863                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1864         }
1865
1866         talloc_free(rec->send_election_te);
1867         rec->send_election_te = NULL;
1868 }
1869
1870 /*
1871   handler for memory dumps
1872 */
1873 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1874                              TDB_DATA data, void *private_data)
1875 {
1876         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1877         TDB_DATA *dump;
1878         int ret;
1879         struct rd_memdump_reply *rd;
1880
1881         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1882                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1883                 talloc_free(tmp_ctx);
1884                 return;
1885         }
1886         rd = (struct rd_memdump_reply *)data.dptr;
1887
1888         dump = talloc_zero(tmp_ctx, TDB_DATA);
1889         if (dump == NULL) {
1890                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1891                 talloc_free(tmp_ctx);
1892                 return;
1893         }
1894         ret = ctdb_dump_memory(ctdb, dump);
1895         if (ret != 0) {
1896                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1897                 talloc_free(tmp_ctx);
1898                 return;
1899         }
1900
1901 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1902
1903         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1904         if (ret != 0) {
1905                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1906                 talloc_free(tmp_ctx);
1907                 return;
1908         }
1909
1910         talloc_free(tmp_ctx);
1911 }
1912
1913 /*
1914   handler for reload_nodes
1915 */
1916 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1917                              TDB_DATA data, void *private_data)
1918 {
1919         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1920
1921         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1922
1923         reload_nodes_file(rec->ctdb);
1924 }
1925
1926
1927 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1928                               struct timeval yt, void *p)
1929 {
1930         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1931
1932         talloc_free(rec->ip_check_disable_ctx);
1933         rec->ip_check_disable_ctx = NULL;
1934 }
1935
1936
1937 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1938                              TDB_DATA data, void *private_data)
1939 {
1940         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1941         struct ctdb_public_ip *ip;
1942
1943         if (rec->recmaster != rec->ctdb->pnn) {
1944                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1945                 return;
1946         }
1947
1948         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1949                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1950                 return;
1951         }
1952
1953         ip = (struct ctdb_public_ip *)data.dptr;
1954
1955         update_ip_assignment_tree(rec->ctdb, ip);
1956 }
1957
1958
1959 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1960                              TDB_DATA data, void *private_data)
1961 {
1962         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1963         uint32_t timeout;
1964
1965         if (rec->ip_check_disable_ctx != NULL) {
1966                 talloc_free(rec->ip_check_disable_ctx);
1967                 rec->ip_check_disable_ctx = NULL;
1968         }
1969
1970         if (data.dsize != sizeof(uint32_t)) {
1971                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1972                                  "expexting %lu\n", (long unsigned)data.dsize,
1973                                  (long unsigned)sizeof(uint32_t)));
1974                 return;
1975         }
1976         if (data.dptr == NULL) {
1977                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1978                 return;
1979         }
1980
1981         timeout = *((uint32_t *)data.dptr);
1982         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1983
1984         rec->ip_check_disable_ctx = talloc_new(rec);
1985         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1986
1987         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1988 }
1989
1990
1991 /*
1992   handler for ip reallocate, just add it to the list of callers and 
1993   handle this later in the monitor_cluster loop so we do not recurse
1994   with other callers to takeover_run()
1995 */
1996 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1997                              TDB_DATA data, void *private_data)
1998 {
1999         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2000         struct ip_reallocate_list *caller;
2001
2002         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2003                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2004                 return;
2005         }
2006
2007         if (rec->ip_reallocate_ctx == NULL) {
2008                 rec->ip_reallocate_ctx = talloc_new(rec);
2009                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2010         }
2011
2012         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2013         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2014
2015         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2016         caller->next = rec->reallocate_callers;
2017         rec->reallocate_callers = caller;
2018
2019         return;
2020 }
2021
2022 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2023 {
2024         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2025         TDB_DATA result;
2026         int32_t ret;
2027         struct ip_reallocate_list *callers;
2028         uint32_t culprit;
2029
2030         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2031
2032         /* update the list of public ips that a node can handle for
2033            all connected nodes
2034         */
2035         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2036         if (ret != 0) {
2037                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2038                                  culprit));
2039                 rec->need_takeover_run = true;
2040         }
2041         if (ret == 0) {
2042                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2043                 if (ret != 0) {
2044                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2045                                          culprit));
2046                         rec->need_takeover_run = true;
2047                 }
2048         }
2049
2050         result.dsize = sizeof(int32_t);
2051         result.dptr  = (uint8_t *)&ret;
2052
2053         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2054
2055                 /* Someone that sent srvid==0 does not want a reply */
2056                 if (callers->rd->srvid == 0) {
2057                         continue;
2058                 }
2059                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2060                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2061                                   (unsigned long long)callers->rd->srvid));
2062                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2063                 if (ret != 0) {
2064                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2065                                          "message to %u:%llu\n",
2066                                          (unsigned)callers->rd->pnn,
2067                                          (unsigned long long)callers->rd->srvid));
2068                 }
2069         }
2070
2071         talloc_free(tmp_ctx);
2072         talloc_free(rec->ip_reallocate_ctx);
2073         rec->ip_reallocate_ctx = NULL;
2074         rec->reallocate_callers = NULL;
2075         
2076 }
2077
2078
2079 /*
2080   handler for recovery master elections
2081 */
2082 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2083                              TDB_DATA data, void *private_data)
2084 {
2085         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2086         int ret;
2087         struct election_message *em = (struct election_message *)data.dptr;
2088         TALLOC_CTX *mem_ctx;
2089
2090         /* we got an election packet - update the timeout for the election */
2091         talloc_free(rec->election_timeout);
2092         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2093                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2094                                                 ctdb_election_timeout, rec);
2095
2096         mem_ctx = talloc_new(ctdb);
2097
2098         /* someone called an election. check their election data
2099            and if we disagree and we would rather be the elected node, 
2100            send a new election message to all other nodes
2101          */
2102         if (ctdb_election_win(rec, em)) {
2103                 if (!rec->send_election_te) {
2104                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2105                                                                 timeval_current_ofs(0, 500000),
2106                                                                 election_send_request, rec);
2107                 }
2108                 talloc_free(mem_ctx);
2109                 /*unban_all_nodes(ctdb);*/
2110                 return;
2111         }
2112         
2113         /* we didn't win */
2114         talloc_free(rec->send_election_te);
2115         rec->send_election_te = NULL;
2116
2117         if (ctdb->tunable.verify_recovery_lock != 0) {
2118                 /* release the recmaster lock */
2119                 if (em->pnn != ctdb->pnn &&
2120                     ctdb->recovery_lock_fd != -1) {
2121                         close(ctdb->recovery_lock_fd);
2122                         ctdb->recovery_lock_fd = -1;
2123                         unban_all_nodes(ctdb);
2124                 }
2125         }
2126
2127         /* ok, let that guy become recmaster then */
2128         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2129         if (ret != 0) {
2130                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2131                 talloc_free(mem_ctx);
2132                 return;
2133         }
2134
2135         talloc_free(mem_ctx);
2136         return;
2137 }
2138
2139
2140 /*
2141   force the start of the election process
2142  */
2143 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2144                            struct ctdb_node_map *nodemap)
2145 {
2146         int ret;
2147         struct ctdb_context *ctdb = rec->ctdb;
2148
2149         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2150
2151         /* set all nodes to recovery mode to stop all internode traffic */
2152         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2153         if (ret != 0) {
2154                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2155                 return;
2156         }
2157
2158         talloc_free(rec->election_timeout);
2159         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2160                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2161                                                 ctdb_election_timeout, rec);
2162
2163         ret = send_election_request(rec, pnn, true);
2164         if (ret!=0) {
2165                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2166                 return;
2167         }
2168
2169         /* wait for a few seconds to collect all responses */
2170         ctdb_wait_election(rec);
2171 }
2172
2173
2174
2175 /*
2176   handler for when a node changes its flags
2177 */
2178 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2179                             TDB_DATA data, void *private_data)
2180 {
2181         int ret;
2182         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2183         struct ctdb_node_map *nodemap=NULL;
2184         TALLOC_CTX *tmp_ctx;
2185         uint32_t changed_flags;
2186         int i;
2187         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2188         int disabled_flag_changed;
2189
2190         if (data.dsize != sizeof(*c)) {
2191                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2192                 return;
2193         }
2194
2195         tmp_ctx = talloc_new(ctdb);
2196         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2197
2198         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2199         if (ret != 0) {
2200                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2201                 talloc_free(tmp_ctx);
2202                 return;         
2203         }
2204
2205
2206         for (i=0;i<nodemap->num;i++) {
2207                 if (nodemap->nodes[i].pnn == c->pnn) break;
2208         }
2209
2210         if (i == nodemap->num) {
2211                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2212                 talloc_free(tmp_ctx);
2213                 return;
2214         }
2215
2216         changed_flags = c->old_flags ^ c->new_flags;
2217
2218         if (nodemap->nodes[i].flags != c->new_flags) {
2219                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2220         }
2221
2222         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2223
2224         nodemap->nodes[i].flags = c->new_flags;
2225
2226         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2227                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2228
2229         if (ret == 0) {
2230                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2231                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2232         }
2233         
2234         if (ret == 0 &&
2235             ctdb->recovery_master == ctdb->pnn &&
2236             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2237                 /* Only do the takeover run if the perm disabled or unhealthy
2238                    flags changed since these will cause an ip failover but not
2239                    a recovery.
2240                    If the node became disconnected or banned this will also
2241                    lead to an ip address failover but that is handled 
2242                    during recovery
2243                 */
2244                 if (disabled_flag_changed) {
2245                         rec->need_takeover_run = true;
2246                 }
2247         }
2248
2249         talloc_free(tmp_ctx);
2250 }
2251
2252 /*
2253   handler for when we need to push out flag changes ot all other nodes
2254 */
2255 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2256                             TDB_DATA data, void *private_data)
2257 {
2258         int ret;
2259         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2260         struct ctdb_node_map *nodemap=NULL;
2261         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2262         uint32_t recmaster;
2263         uint32_t *nodes;
2264
2265         /* find the recovery master */
2266         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2267         if (ret != 0) {
2268                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2269                 talloc_free(tmp_ctx);
2270                 return;
2271         }
2272
2273         /* read the node flags from the recmaster */
2274         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2275         if (ret != 0) {
2276                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2277                 talloc_free(tmp_ctx);
2278                 return;
2279         }
2280         if (c->pnn >= nodemap->num) {
2281                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2282                 talloc_free(tmp_ctx);
2283                 return;
2284         }
2285
2286         /* send the flags update to all connected nodes */
2287         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2288
2289         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2290                                       nodes, 0, CONTROL_TIMEOUT(),
2291                                       false, data,
2292                                       NULL, NULL,
2293                                       NULL) != 0) {
2294                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2295
2296                 talloc_free(tmp_ctx);
2297                 return;
2298         }
2299
2300         talloc_free(tmp_ctx);
2301 }
2302
2303
2304 struct verify_recmode_normal_data {
2305         uint32_t count;
2306         enum monitor_result status;
2307 };
2308
2309 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2310 {
2311         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2312
2313
2314         /* one more node has responded with recmode data*/
2315         rmdata->count--;
2316
2317         /* if we failed to get the recmode, then return an error and let
2318            the main loop try again.
2319         */
2320         if (state->state != CTDB_CONTROL_DONE) {
2321                 if (rmdata->status == MONITOR_OK) {
2322                         rmdata->status = MONITOR_FAILED;
2323                 }
2324                 return;
2325         }
2326
2327         /* if we got a response, then the recmode will be stored in the
2328            status field
2329         */
2330         if (state->status != CTDB_RECOVERY_NORMAL) {
2331                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2332                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2333         }
2334
2335         return;
2336 }
2337
2338
2339 /* verify that all nodes are in normal recovery mode */
2340 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2341 {
2342         struct verify_recmode_normal_data *rmdata;
2343         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2344         struct ctdb_client_control_state *state;
2345         enum monitor_result status;
2346         int j;
2347         
2348         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2349         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2350         rmdata->count  = 0;
2351         rmdata->status = MONITOR_OK;
2352
2353         /* loop over all active nodes and send an async getrecmode call to 
2354            them*/
2355         for (j=0; j<nodemap->num; j++) {
2356                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2357                         continue;
2358                 }
2359                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2360                                         CONTROL_TIMEOUT(), 
2361                                         nodemap->nodes[j].pnn);
2362                 if (state == NULL) {
2363                         /* we failed to send the control, treat this as 
2364                            an error and try again next iteration
2365                         */                      
2366                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2367                         talloc_free(mem_ctx);
2368                         return MONITOR_FAILED;
2369                 }
2370
2371                 /* set up the callback functions */
2372                 state->async.fn = verify_recmode_normal_callback;
2373                 state->async.private_data = rmdata;
2374
2375                 /* one more control to wait for to complete */
2376                 rmdata->count++;
2377         }
2378
2379
2380         /* now wait for up to the maximum number of seconds allowed
2381            or until all nodes we expect a response from has replied
2382         */
2383         while (rmdata->count > 0) {
2384                 event_loop_once(ctdb->ev);
2385         }
2386
2387         status = rmdata->status;
2388         talloc_free(mem_ctx);
2389         return status;
2390 }
2391
2392
2393 struct verify_recmaster_data {
2394         struct ctdb_recoverd *rec;
2395         uint32_t count;
2396         uint32_t pnn;
2397         enum monitor_result status;
2398 };
2399
2400 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2401 {
2402         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2403
2404
2405         /* one more node has responded with recmaster data*/
2406         rmdata->count--;
2407
2408         /* if we failed to get the recmaster, then return an error and let
2409            the main loop try again.
2410         */
2411         if (state->state != CTDB_CONTROL_DONE) {
2412                 if (rmdata->status == MONITOR_OK) {
2413                         rmdata->status = MONITOR_FAILED;
2414                 }
2415                 return;
2416         }
2417
2418         /* if we got a response, then the recmaster will be stored in the
2419            status field
2420         */
2421         if (state->status != rmdata->pnn) {
2422                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2423                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2424                 rmdata->status = MONITOR_ELECTION_NEEDED;
2425         }
2426
2427         return;
2428 }
2429
2430
2431 /* verify that all nodes agree that we are the recmaster */
2432 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2433 {
2434         struct ctdb_context *ctdb = rec->ctdb;
2435         struct verify_recmaster_data *rmdata;
2436         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2437         struct ctdb_client_control_state *state;
2438         enum monitor_result status;
2439         int j;
2440         
2441         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2442         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2443         rmdata->rec    = rec;
2444         rmdata->count  = 0;
2445         rmdata->pnn    = pnn;
2446         rmdata->status = MONITOR_OK;
2447
2448         /* loop over all active nodes and send an async getrecmaster call to 
2449            them*/
2450         for (j=0; j<nodemap->num; j++) {
2451                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2452                         continue;
2453                 }
2454                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2455                                         CONTROL_TIMEOUT(),
2456                                         nodemap->nodes[j].pnn);
2457                 if (state == NULL) {
2458                         /* we failed to send the control, treat this as 
2459                            an error and try again next iteration
2460                         */                      
2461                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2462                         talloc_free(mem_ctx);
2463                         return MONITOR_FAILED;
2464                 }
2465
2466                 /* set up the callback functions */
2467                 state->async.fn = verify_recmaster_callback;
2468                 state->async.private_data = rmdata;
2469
2470                 /* one more control to wait for to complete */
2471                 rmdata->count++;
2472         }
2473
2474
2475         /* now wait for up to the maximum number of seconds allowed
2476            or until all nodes we expect a response from has replied
2477         */
2478         while (rmdata->count > 0) {
2479                 event_loop_once(ctdb->ev);
2480         }
2481
2482         status = rmdata->status;
2483         talloc_free(mem_ctx);
2484         return status;
2485 }
2486
2487
2488 /* called to check that the local allocation of public ip addresses is ok.
2489 */
2490 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2491 {
2492         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2493         struct ctdb_control_get_ifaces *ifaces = NULL;
2494         struct ctdb_all_public_ips *ips = NULL;
2495         struct ctdb_uptime *uptime1 = NULL;
2496         struct ctdb_uptime *uptime2 = NULL;
2497         int ret, j;
2498         bool need_iface_check = false;
2499         bool need_takeover_run = false;
2500
2501         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2502                                 CTDB_CURRENT_NODE, &uptime1);
2503         if (ret != 0) {
2504                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2505                 talloc_free(mem_ctx);
2506                 return -1;
2507         }
2508
2509
2510         /* read the interfaces from the local node */
2511         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2512         if (ret != 0) {
2513                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2514                 talloc_free(mem_ctx);
2515                 return -1;
2516         }
2517
2518         if (!rec->ifaces) {
2519                 need_iface_check = true;
2520         } else if (rec->ifaces->num != ifaces->num) {
2521                 need_iface_check = true;
2522         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2523                 need_iface_check = true;
2524         }
2525
2526         if (need_iface_check) {
2527                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2528                                      "local node %u - force takeover run\n",
2529                                      pnn));
2530                 need_takeover_run = true;
2531         }
2532
2533         /* read the ip allocation from the local node */
2534         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2535         if (ret != 0) {
2536                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2537                 talloc_free(mem_ctx);
2538                 return -1;
2539         }
2540
2541         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2542                                 CTDB_CURRENT_NODE, &uptime2);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2545                 talloc_free(mem_ctx);
2546                 return -1;
2547         }
2548
2549         /* skip the check if the startrecovery time has changed */
2550         if (timeval_compare(&uptime1->last_recovery_started,
2551                             &uptime2->last_recovery_started) != 0) {
2552                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2553                 talloc_free(mem_ctx);
2554                 return 0;
2555         }
2556
2557         /* skip the check if the endrecovery time has changed */
2558         if (timeval_compare(&uptime1->last_recovery_finished,
2559                             &uptime2->last_recovery_finished) != 0) {
2560                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2561                 talloc_free(mem_ctx);
2562                 return 0;
2563         }
2564
2565         /* skip the check if we have started but not finished recovery */
2566         if (timeval_compare(&uptime1->last_recovery_finished,
2567                             &uptime1->last_recovery_started) != 1) {
2568                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2569                 talloc_free(mem_ctx);
2570
2571                 return 0;
2572         }
2573
2574         talloc_free(rec->ifaces);
2575         rec->ifaces = talloc_steal(rec, ifaces);
2576
2577         /* verify that we have the ip addresses we should have
2578            and we dont have ones we shouldnt have.
2579            if we find an inconsistency we set recmode to
2580            active on the local node and wait for the recmaster
2581            to do a full blown recovery
2582         */
2583         for (j=0; j<ips->num; j++) {
2584                 if (ips->ips[j].pnn == pnn) {
2585                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2586                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2587                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2588                                 need_takeover_run = true;
2589                         }
2590                 } else {
2591                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2592                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2593                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2594                                 need_takeover_run = true;
2595                         }
2596                 }
2597         }
2598
2599         if (need_takeover_run) {
2600                 struct takeover_run_reply rd;
2601                 TDB_DATA data;
2602
2603                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2604
2605                 rd.pnn = ctdb->pnn;
2606                 rd.srvid = 0;
2607                 data.dptr = (uint8_t *)&rd;
2608                 data.dsize = sizeof(rd);
2609
2610                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2611                 if (ret != 0) {
2612                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2613                 }
2614         }
2615         talloc_free(mem_ctx);
2616         return 0;
2617 }
2618
2619
2620 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2621 {
2622         struct ctdb_node_map **remote_nodemaps = callback_data;
2623
2624         if (node_pnn >= ctdb->num_nodes) {
2625                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2626                 return;
2627         }
2628
2629         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2630
2631 }
2632
2633 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2634         struct ctdb_node_map *nodemap,
2635         struct ctdb_node_map **remote_nodemaps)
2636 {
2637         uint32_t *nodes;
2638
2639         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2640         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2641                                         nodes, 0,
2642                                         CONTROL_TIMEOUT(), false, tdb_null,
2643                                         async_getnodemap_callback,
2644                                         NULL,
2645                                         remote_nodemaps) != 0) {
2646                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2647
2648                 return -1;
2649         }
2650
2651         return 0;
2652 }
2653
2654 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2655 struct ctdb_check_reclock_state {
2656         struct ctdb_context *ctdb;
2657         struct timeval start_time;
2658         int fd[2];
2659         pid_t child;
2660         struct timed_event *te;
2661         struct fd_event *fde;
2662         enum reclock_child_status status;
2663 };
2664
2665 /* when we free the reclock state we must kill any child process.
2666 */
2667 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2668 {
2669         struct ctdb_context *ctdb = state->ctdb;
2670
2671         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2672
2673         if (state->fd[0] != -1) {
2674                 close(state->fd[0]);
2675                 state->fd[0] = -1;
2676         }
2677         if (state->fd[1] != -1) {
2678                 close(state->fd[1]);
2679                 state->fd[1] = -1;
2680         }
2681         kill(state->child, SIGKILL);
2682         return 0;
2683 }
2684
2685 /*
2686   called if our check_reclock child times out. this would happen if
2687   i/o to the reclock file blocks.
2688  */
2689 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2690                                          struct timeval t, void *private_data)
2691 {
2692         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2693                                            struct ctdb_check_reclock_state);
2694
2695         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2696         state->status = RECLOCK_TIMEOUT;
2697 }
2698
2699 /* this is called when the child process has completed checking the reclock
2700    file and has written data back to us through the pipe.
2701 */
2702 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2703                              uint16_t flags, void *private_data)
2704 {
2705         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2706                                              struct ctdb_check_reclock_state);
2707         char c = 0;
2708         int ret;
2709
2710         /* we got a response from our child process so we can abort the
2711            timeout.
2712         */
2713         talloc_free(state->te);
2714         state->te = NULL;
2715
2716         ret = read(state->fd[0], &c, 1);
2717         if (ret != 1 || c != RECLOCK_OK) {
2718                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2719                 state->status = RECLOCK_FAILED;
2720
2721                 return;
2722         }
2723
2724         state->status = RECLOCK_OK;
2725         return;
2726 }
2727
2728 static int check_recovery_lock(struct ctdb_context *ctdb)
2729 {
2730         int ret;
2731         struct ctdb_check_reclock_state *state;
2732         pid_t parent = getpid();
2733
2734         if (ctdb->recovery_lock_fd == -1) {
2735                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2736                 return -1;
2737         }
2738
2739         state = talloc(ctdb, struct ctdb_check_reclock_state);
2740         CTDB_NO_MEMORY(ctdb, state);
2741
2742         state->ctdb = ctdb;
2743         state->start_time = timeval_current();
2744         state->status = RECLOCK_CHECKING;
2745         state->fd[0] = -1;
2746         state->fd[1] = -1;
2747
2748         ret = pipe(state->fd);
2749         if (ret != 0) {
2750                 talloc_free(state);
2751                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2752                 return -1;
2753         }
2754
2755         state->child = fork();
2756         if (state->child == (pid_t)-1) {
2757                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2758                 close(state->fd[0]);
2759                 state->fd[0] = -1;
2760                 close(state->fd[1]);
2761                 state->fd[1] = -1;
2762                 talloc_free(state);
2763                 return -1;
2764         }
2765
2766         if (state->child == 0) {
2767                 char cc = RECLOCK_OK;
2768                 close(state->fd[0]);
2769                 state->fd[0] = -1;
2770
2771                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2772                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2773                         cc = RECLOCK_FAILED;
2774                 }
2775
2776                 write(state->fd[1], &cc, 1);
2777                 /* make sure we die when our parent dies */
2778                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2779                         sleep(5);
2780                         write(state->fd[1], &cc, 1);
2781                 }
2782                 _exit(0);
2783         }
2784         close(state->fd[1]);
2785         state->fd[1] = -1;
2786         set_close_on_exec(state->fd[0]);
2787
2788         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2789
2790         talloc_set_destructor(state, check_reclock_destructor);
2791
2792         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2793                                     ctdb_check_reclock_timeout, state);
2794         if (state->te == NULL) {
2795                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2796                 talloc_free(state);
2797                 return -1;
2798         }
2799
2800         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2801                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2802                                 reclock_child_handler,
2803                                 (void *)state);
2804
2805         if (state->fde == NULL) {
2806                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2807                 talloc_free(state);
2808                 return -1;
2809         }
2810
2811         while (state->status == RECLOCK_CHECKING) {
2812                 event_loop_once(ctdb->ev);
2813         }
2814
2815         if (state->status == RECLOCK_FAILED) {
2816                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2817                 close(ctdb->recovery_lock_fd);
2818                 ctdb->recovery_lock_fd = -1;
2819                 talloc_free(state);
2820                 return -1;
2821         }
2822
2823         talloc_free(state);
2824         return 0;
2825 }
2826
2827 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2828 {
2829         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2830         const char *reclockfile;
2831
2832         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2833                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2834                 talloc_free(tmp_ctx);
2835                 return -1;      
2836         }
2837
2838         if (reclockfile == NULL) {
2839                 if (ctdb->recovery_lock_file != NULL) {
2840                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2841                         talloc_free(ctdb->recovery_lock_file);
2842                         ctdb->recovery_lock_file = NULL;
2843                         if (ctdb->recovery_lock_fd != -1) {
2844                                 close(ctdb->recovery_lock_fd);
2845                                 ctdb->recovery_lock_fd = -1;
2846                         }
2847                 }
2848                 ctdb->tunable.verify_recovery_lock = 0;
2849                 talloc_free(tmp_ctx);
2850                 return 0;
2851         }
2852
2853         if (ctdb->recovery_lock_file == NULL) {
2854                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2855                 if (ctdb->recovery_lock_fd != -1) {
2856                         close(ctdb->recovery_lock_fd);
2857                         ctdb->recovery_lock_fd = -1;
2858                 }
2859                 talloc_free(tmp_ctx);
2860                 return 0;
2861         }
2862
2863
2864         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2865                 talloc_free(tmp_ctx);
2866                 return 0;
2867         }
2868
2869         talloc_free(ctdb->recovery_lock_file);
2870         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2871         ctdb->tunable.verify_recovery_lock = 0;
2872         if (ctdb->recovery_lock_fd != -1) {
2873                 close(ctdb->recovery_lock_fd);
2874                 ctdb->recovery_lock_fd = -1;
2875         }
2876
2877         talloc_free(tmp_ctx);
2878         return 0;
2879 }
2880                 
2881 /*
2882   the main monitoring loop
2883  */
2884 static void monitor_cluster(struct ctdb_context *ctdb)
2885 {
2886         uint32_t pnn;
2887         TALLOC_CTX *mem_ctx=NULL;
2888         struct ctdb_node_map *nodemap=NULL;
2889         struct ctdb_node_map *recmaster_nodemap=NULL;
2890         struct ctdb_node_map **remote_nodemaps=NULL;
2891         struct ctdb_vnn_map *vnnmap=NULL;
2892         struct ctdb_vnn_map *remote_vnnmap=NULL;
2893         int32_t debug_level;
2894         int i, j, ret;
2895         struct ctdb_recoverd *rec;
2896
2897         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2898
2899         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2900         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2901
2902         rec->ctdb = ctdb;
2903
2904         rec->priority_time = timeval_current();
2905
2906         /* register a message port for sending memory dumps */
2907         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2908
2909         /* register a message port for recovery elections */
2910         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2911
2912         /* when nodes are disabled/enabled */
2913         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2914
2915         /* when we are asked to puch out a flag change */
2916         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2917
2918         /* register a message port for vacuum fetch */
2919         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2920
2921         /* register a message port for reloadnodes  */
2922         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2923
2924         /* register a message port for performing a takeover run */
2925         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2926
2927         /* register a message port for disabling the ip check for a short while */
2928         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2929
2930         /* register a message port for updating the recovery daemons node assignment for an ip */
2931         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
2932
2933 again:
2934         if (mem_ctx) {
2935                 talloc_free(mem_ctx);
2936                 mem_ctx = NULL;
2937         }
2938         mem_ctx = talloc_new(ctdb);
2939         if (!mem_ctx) {
2940                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2941                 exit(-1);
2942         }
2943
2944         /* we only check for recovery once every second */
2945         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2946
2947         /* verify that the main daemon is still running */
2948         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2949                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2950                 exit(-1);
2951         }
2952
2953         /* ping the local daemon to tell it we are alive */
2954         ctdb_ctrl_recd_ping(ctdb);
2955
2956         if (rec->election_timeout) {
2957                 /* an election is in progress */
2958                 goto again;
2959         }
2960
2961         /* read the debug level from the parent and update locally */
2962         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2963         if (ret !=0) {
2964                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2965                 goto again;
2966         }
2967         LogLevel = debug_level;
2968
2969
2970         /* We must check if we need to ban a node here but we want to do this
2971            as early as possible so we dont wait until we have pulled the node
2972            map from the local node. thats why we have the hardcoded value 20
2973         */
2974         for (i=0; i<ctdb->num_nodes; i++) {
2975                 struct ctdb_banning_state *ban_state;
2976
2977                 if (ctdb->nodes[i]->ban_state == NULL) {
2978                         continue;
2979                 }
2980                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2981                 if (ban_state->count < 20) {
2982                         continue;
2983                 }
2984                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2985                         ctdb->nodes[i]->pnn, ban_state->count,
2986                         ctdb->tunable.recovery_ban_period));
2987                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2988                 ban_state->count = 0;
2989         }
2990
2991         /* get relevant tunables */
2992         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2993         if (ret != 0) {
2994                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2995                 goto again;
2996         }
2997
2998         /* get the current recovery lock file from the server */
2999         if (update_recovery_lock_file(ctdb) != 0) {
3000                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3001                 goto again;
3002         }
3003
3004         /* Make sure that if recovery lock verification becomes disabled when
3005            we close the file
3006         */
3007         if (ctdb->tunable.verify_recovery_lock == 0) {
3008                 if (ctdb->recovery_lock_fd != -1) {
3009                         close(ctdb->recovery_lock_fd);
3010                         ctdb->recovery_lock_fd = -1;
3011                 }
3012         }
3013
3014         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3015         if (pnn == (uint32_t)-1) {
3016                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3017                 goto again;
3018         }
3019
3020         /* get the vnnmap */
3021         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3022         if (ret != 0) {
3023                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3024                 goto again;
3025         }
3026
3027
3028         /* get number of nodes */
3029         if (rec->nodemap) {
3030                 talloc_free(rec->nodemap);
3031                 rec->nodemap = NULL;
3032                 nodemap=NULL;
3033         }
3034         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3035         if (ret != 0) {
3036                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3037                 goto again;
3038         }
3039         nodemap = rec->nodemap;
3040
3041         /* check which node is the recovery master */
3042         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3043         if (ret != 0) {
3044                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3045                 goto again;
3046         }
3047
3048         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3049         if (rec->recmaster != pnn) {
3050                 if (rec->ip_reallocate_ctx != NULL) {
3051                         talloc_free(rec->ip_reallocate_ctx);
3052                         rec->ip_reallocate_ctx = NULL;
3053                         rec->reallocate_callers = NULL;
3054                 }
3055         }
3056         /* if there are takeovers requested, perform it and notify the waiters */
3057         if (rec->reallocate_callers) {
3058                 process_ipreallocate_requests(ctdb, rec);
3059         }
3060
3061         if (rec->recmaster == (uint32_t)-1) {
3062                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3063                 force_election(rec, pnn, nodemap);
3064                 goto again;
3065         }
3066
3067
3068         /* if the local daemon is STOPPED, we verify that the databases are
3069            also frozen and thet the recmode is set to active 
3070         */
3071         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3072                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3073                 if (ret != 0) {
3074                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3075                 }
3076                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3077                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3078
3079                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3080                         if (ret != 0) {
3081                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3082                                 goto again;
3083                         }
3084                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3085                         if (ret != 0) {
3086                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3087
3088                                 goto again;
3089                         }
3090                         goto again;
3091                 }
3092         }
3093         /* If the local node is stopped, verify we are not the recmaster 
3094            and yield this role if so
3095         */
3096         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3097                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3098                 force_election(rec, pnn, nodemap);
3099                 goto again;
3100         }
3101         
3102         /* check that we (recovery daemon) and the local ctdb daemon
3103            agrees on whether we are banned or not
3104         */
3105 //qqq
3106
3107         /* remember our own node flags */
3108         rec->node_flags = nodemap->nodes[pnn].flags;
3109
3110         /* count how many active nodes there are */
3111         rec->num_active    = 0;
3112         rec->num_connected = 0;
3113         for (i=0; i<nodemap->num; i++) {
3114                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3115                         rec->num_active++;
3116                 }
3117                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3118                         rec->num_connected++;
3119                 }
3120         }
3121
3122
3123         /* verify that the recmaster node is still active */
3124         for (j=0; j<nodemap->num; j++) {
3125                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3126                         break;
3127                 }
3128         }
3129
3130         if (j == nodemap->num) {
3131                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3132                 force_election(rec, pnn, nodemap);
3133                 goto again;
3134         }
3135
3136         /* if recovery master is disconnected we must elect a new recmaster */
3137         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3138                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3139                 force_election(rec, pnn, nodemap);
3140                 goto again;
3141         }
3142
3143         /* grap the nodemap from the recovery master to check if it is banned */
3144         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3145                                    mem_ctx, &recmaster_nodemap);
3146         if (ret != 0) {
3147                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3148                           nodemap->nodes[j].pnn));
3149                 goto again;
3150         }
3151
3152
3153         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3154                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3155                 force_election(rec, pnn, nodemap);
3156                 goto again;
3157         }
3158
3159
3160         /* verify that we have all ip addresses we should have and we dont
3161          * have addresses we shouldnt have.
3162          */ 
3163         if (ctdb->do_checkpublicip) {
3164                 if (rec->ip_check_disable_ctx == NULL) {
3165                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3166                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3167                         }
3168                 }
3169         }
3170
3171
3172         /* if we are not the recmaster then we do not need to check
3173            if recovery is needed
3174          */
3175         if (pnn != rec->recmaster) {
3176                 goto again;
3177         }
3178
3179
3180         /* ensure our local copies of flags are right */
3181         ret = update_local_flags(rec, nodemap);
3182         if (ret == MONITOR_ELECTION_NEEDED) {
3183                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3184                 force_election(rec, pnn, nodemap);
3185                 goto again;
3186         }
3187         if (ret != MONITOR_OK) {
3188                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3189                 goto again;
3190         }
3191
3192         if (ctdb->num_nodes != nodemap->num) {
3193                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3194                 reload_nodes_file(ctdb);
3195                 goto again;
3196         }
3197
3198         /* verify that all active nodes agree that we are the recmaster */
3199         switch (verify_recmaster(rec, nodemap, pnn)) {
3200         case MONITOR_RECOVERY_NEEDED:
3201                 /* can not happen */
3202                 goto again;
3203         case MONITOR_ELECTION_NEEDED:
3204                 force_election(rec, pnn, nodemap);
3205                 goto again;
3206         case MONITOR_OK:
3207                 break;
3208         case MONITOR_FAILED:
3209                 goto again;
3210         }
3211
3212
3213         if (rec->need_recovery) {
3214                 /* a previous recovery didn't finish */
3215                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3216                 goto again;             
3217         }
3218
3219         /* verify that all active nodes are in normal mode 
3220            and not in recovery mode 
3221         */
3222         switch (verify_recmode(ctdb, nodemap)) {
3223         case MONITOR_RECOVERY_NEEDED:
3224                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3225                 goto again;
3226         case MONITOR_FAILED:
3227                 goto again;
3228         case MONITOR_ELECTION_NEEDED:
3229                 /* can not happen */
3230         case MONITOR_OK:
3231                 break;
3232         }
3233
3234
3235         if (ctdb->tunable.verify_recovery_lock != 0) {
3236                 /* we should have the reclock - check its not stale */
3237                 ret = check_recovery_lock(ctdb);
3238                 if (ret != 0) {
3239                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3240                         ctdb_set_culprit(rec, ctdb->pnn);
3241                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3242                         goto again;
3243                 }
3244         }
3245
3246         /* get the nodemap for all active remote nodes
3247          */
3248         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3249         if (remote_nodemaps == NULL) {
3250                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3251                 goto again;
3252         }
3253         for(i=0; i<nodemap->num; i++) {
3254                 remote_nodemaps[i] = NULL;
3255         }
3256         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3257                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3258                 goto again;
3259         } 
3260
3261         /* verify that all other nodes have the same nodemap as we have
3262         */
3263         for (j=0; j<nodemap->num; j++) {
3264                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3265                         continue;
3266                 }
3267
3268                 if (remote_nodemaps[j] == NULL) {
3269                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3270                         ctdb_set_culprit(rec, j);
3271
3272                         goto again;
3273                 }
3274
3275                 /* if the nodes disagree on how many nodes there are
3276                    then this is a good reason to try recovery
3277                  */
3278                 if (remote_nodemaps[j]->num != nodemap->num) {
3279                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3280                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3281                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3282                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3283                         goto again;
3284                 }
3285
3286                 /* if the nodes disagree on which nodes exist and are
3287                    active, then that is also a good reason to do recovery
3288                  */
3289                 for (i=0;i<nodemap->num;i++) {
3290                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3291                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3292                                           nodemap->nodes[j].pnn, i, 
3293                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3294                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3295                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3296                                             vnnmap);
3297                                 goto again;
3298                         }
3299                 }
3300
3301                 /* verify the flags are consistent
3302                 */
3303                 for (i=0; i<nodemap->num; i++) {
3304                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3305                                 continue;
3306                         }
3307                         
3308                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3309                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3310                                   nodemap->nodes[j].pnn, 
3311                                   nodemap->nodes[i].pnn, 
3312                                   remote_nodemaps[j]->nodes[i].flags,
3313                                   nodemap->nodes[j].flags));
3314                                 if (i == j) {
3315                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3316                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3317                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3318                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3319                                                     vnnmap);
3320                                         goto again;
3321                                 } else {
3322                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3323                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3324                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3325                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3326                                                     vnnmap);
3327                                         goto again;
3328                                 }
3329                         }
3330                 }
3331         }
3332
3333
3334         /* there better be the same number of lmasters in the vnn map
3335            as there are active nodes or we will have to do a recovery
3336          */
3337         if (vnnmap->size != rec->num_active) {
3338                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3339                           vnnmap->size, rec->num_active));
3340                 ctdb_set_culprit(rec, ctdb->pnn);
3341                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3342                 goto again;
3343         }
3344
3345         /* verify that all active nodes in the nodemap also exist in 
3346            the vnnmap.
3347          */
3348         for (j=0; j<nodemap->num; j++) {
3349                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3350                         continue;
3351                 }
3352                 if (nodemap->nodes[j].pnn == pnn) {
3353                         continue;
3354                 }
3355
3356                 for (i=0; i<vnnmap->size; i++) {
3357                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3358                                 break;
3359                         }
3360                 }
3361                 if (i == vnnmap->size) {
3362                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3363                                   nodemap->nodes[j].pnn));
3364                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3365                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3366                         goto again;
3367                 }
3368         }
3369
3370         
3371         /* verify that all other nodes have the same vnnmap
3372            and are from the same generation
3373          */
3374         for (j=0; j<nodemap->num; j++) {
3375                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3376                         continue;
3377                 }
3378                 if (nodemap->nodes[j].pnn == pnn) {
3379                         continue;
3380                 }
3381
3382                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3383                                           mem_ctx, &remote_vnnmap);
3384                 if (ret != 0) {
3385                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3386                                   nodemap->nodes[j].pnn));
3387                         goto again;
3388                 }
3389
3390                 /* verify the vnnmap generation is the same */
3391                 if (vnnmap->generation != remote_vnnmap->generation) {
3392                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3393                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3394                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3395                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3396                         goto again;
3397                 }
3398
3399                 /* verify the vnnmap size is the same */
3400                 if (vnnmap->size != remote_vnnmap->size) {
3401                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3402                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3403                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3404                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3405                         goto again;
3406                 }
3407
3408                 /* verify the vnnmap is the same */
3409                 for (i=0;i<vnnmap->size;i++) {
3410                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3411                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3412                                           nodemap->nodes[j].pnn));
3413                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3414                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3415                                             vnnmap);
3416                                 goto again;
3417                         }
3418                 }
3419         }
3420
3421         /* we might need to change who has what IP assigned */
3422         if (rec->need_takeover_run) {
3423                 uint32_t culprit = (uint32_t)-1;
3424
3425                 rec->need_takeover_run = false;
3426
3427                 /* update the list of public ips that a node can handle for
3428                    all connected nodes
3429                 */
3430                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3431                 if (ret != 0) {
3432                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3433                                          culprit));
3434                         ctdb_set_culprit(rec, culprit);
3435                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3436                         goto again;
3437                 }
3438
3439                 /* execute the "startrecovery" event script on all nodes */
3440                 ret = run_startrecovery_eventscript(rec, nodemap);
3441                 if (ret!=0) {
3442                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3443                         ctdb_set_culprit(rec, ctdb->pnn);
3444                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3445                         goto again;
3446                 }
3447
3448                 ret = ctdb_takeover_run(ctdb, nodemap);
3449                 if (ret != 0) {
3450                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3451                         ctdb_set_culprit(rec, ctdb->pnn);
3452                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3453                         goto again;
3454                 }
3455
3456                 /* execute the "recovered" event script on all nodes */
3457                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3458 #if 0
3459 // we cant check whether the event completed successfully
3460 // since this script WILL fail if the node is in recovery mode
3461 // and if that race happens, the code here would just cause a second
3462 // cascading recovery.
3463                 if (ret!=0) {
3464                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3465                         ctdb_set_culprit(rec, ctdb->pnn);
3466                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3467                 }
3468 #endif
3469         }
3470
3471
3472         goto again;
3473
3474 }
3475
3476 /*
3477   event handler for when the main ctdbd dies
3478  */
3479 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3480                                  uint16_t flags, void *private_data)
3481 {
3482         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3483         _exit(1);
3484 }
3485
3486 /*
3487   called regularly to verify that the recovery daemon is still running
3488  */
3489 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3490                               struct timeval yt, void *p)
3491 {
3492         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3493
3494         if (kill(ctdb->recoverd_pid, 0) != 0) {
3495                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3496
3497                 ctdb_stop_recoverd(ctdb);
3498                 ctdb_stop_keepalive(ctdb);
3499                 ctdb_stop_monitoring(ctdb);
3500                 ctdb_release_all_ips(ctdb);
3501                 if (ctdb->methods != NULL) {
3502                         ctdb->methods->shutdown(ctdb);
3503                 }
3504                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3505
3506                 exit(10);       
3507         }
3508
3509         event_add_timed(ctdb->ev, ctdb, 
3510                         timeval_current_ofs(30, 0),
3511                         ctdb_check_recd, ctdb);
3512 }
3513
3514 static void recd_sig_child_handler(struct event_context *ev,
3515         struct signal_event *se, int signum, int count,
3516         void *dont_care, 
3517         void *private_data)
3518 {
3519 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3520         int status;
3521         pid_t pid = -1;
3522
3523         while (pid != 0) {
3524                 pid = waitpid(-1, &status, WNOHANG);
3525                 if (pid == -1) {
3526                         if (errno != ECHILD) {
3527                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3528                         }
3529                         return;
3530                 }
3531                 if (pid > 0) {
3532                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3533                 }
3534         }
3535 }
3536
3537 /*
3538   startup the recovery daemon as a child of the main ctdb daemon
3539  */
3540 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3541 {
3542         int fd[2];
3543         struct signal_event *se;
3544
3545         if (pipe(fd) != 0) {
3546                 return -1;
3547         }
3548
3549         ctdb->ctdbd_pid = getpid();
3550
3551         ctdb->recoverd_pid = fork();
3552         if (ctdb->recoverd_pid == -1) {
3553                 return -1;
3554         }
3555         
3556         if (ctdb->recoverd_pid != 0) {
3557                 close(fd[0]);
3558                 event_add_timed(ctdb->ev, ctdb, 
3559                                 timeval_current_ofs(30, 0),
3560                                 ctdb_check_recd, ctdb);
3561                 return 0;
3562         }
3563
3564         close(fd[1]);
3565
3566         srandom(getpid() ^ time(NULL));
3567
3568         if (switch_from_server_to_client(ctdb) != 0) {
3569                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3570                 exit(1);
3571         }
3572
3573         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3574
3575         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3576                      ctdb_recoverd_parent, &fd[0]);     
3577
3578         /* set up a handler to pick up sigchld */
3579         se = event_add_signal(ctdb->ev, ctdb,
3580                                      SIGCHLD, 0,
3581                                      recd_sig_child_handler,
3582                                      ctdb);
3583         if (se == NULL) {
3584                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3585                 exit(1);
3586         }
3587
3588         monitor_cluster(ctdb);
3589
3590         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3591         return -1;
3592 }
3593
3594 /*
3595   shutdown the recovery daemon
3596  */
3597 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3598 {
3599         if (ctdb->recoverd_pid == 0) {
3600                 return;
3601         }
3602
3603         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3604         kill(ctdb->recoverd_pid, SIGTERM);
3605 }