5639d5b2f44f3bba6be0a232e5cb9ad97ef4b4ff
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (ctdb->tunable.disable_ip_failover == 0) {
1281                         if (rec->ip_check_disable_ctx == NULL) {
1282                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1283                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1284                                         rec->need_takeover_run = true;
1285                                 }
1286                         }
1287                 }
1288
1289                 /* grab a new shiny list of public ips from the node */
1290                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1291                                         CONTROL_TIMEOUT(),
1292                                         ctdb->nodes[j]->pnn,
1293                                         ctdb->nodes,
1294                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1295                                         &ctdb->nodes[j]->available_public_ips);
1296                 if (ret != 0) {
1297                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1298                                 ctdb->nodes[j]->pnn));
1299                         if (culprit) {
1300                                 *culprit = ctdb->nodes[j]->pnn;
1301                         }
1302                         return -1;
1303                 }
1304         }
1305
1306         return 0;
1307 }
1308
1309 /* when we start a recovery, make sure all nodes use the same reclock file
1310    setting
1311 */
1312 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1313 {
1314         struct ctdb_context *ctdb = rec->ctdb;
1315         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1316         TDB_DATA data;
1317         uint32_t *nodes;
1318
1319         if (ctdb->recovery_lock_file == NULL) {
1320                 data.dptr  = NULL;
1321                 data.dsize = 0;
1322         } else {
1323                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1324                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1325         }
1326
1327         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1329                                         nodes, 0,
1330                                         CONTROL_TIMEOUT(),
1331                                         false, data,
1332                                         NULL, NULL,
1333                                         rec) != 0) {
1334                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1335                 talloc_free(tmp_ctx);
1336                 return -1;
1337         }
1338
1339         talloc_free(tmp_ctx);
1340         return 0;
1341 }
1342
1343
1344 /*
1345   we are the recmaster, and recovery is needed - start a recovery run
1346  */
1347 static int do_recovery(struct ctdb_recoverd *rec, 
1348                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1349                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1350 {
1351         struct ctdb_context *ctdb = rec->ctdb;
1352         int i, j, ret;
1353         uint32_t generation;
1354         struct ctdb_dbid_map *dbmap;
1355         TDB_DATA data;
1356         uint32_t *nodes;
1357         struct timeval start_time;
1358         uint32_t culprit = (uint32_t)-1;
1359
1360         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1361
1362         /* if recovery fails, force it again */
1363         rec->need_recovery = true;
1364
1365         for (i=0; i<ctdb->num_nodes; i++) {
1366                 struct ctdb_banning_state *ban_state;
1367
1368                 if (ctdb->nodes[i]->ban_state == NULL) {
1369                         continue;
1370                 }
1371                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1372                 if (ban_state->count < 2*ctdb->num_nodes) {
1373                         continue;
1374                 }
1375                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1376                         ctdb->nodes[i]->pnn, ban_state->count,
1377                         ctdb->tunable.recovery_ban_period));
1378                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1379                 ban_state->count = 0;
1380         }
1381
1382
1383         if (ctdb->tunable.verify_recovery_lock != 0) {
1384                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1385                 start_time = timeval_current();
1386                 if (!ctdb_recovery_lock(ctdb, true)) {
1387                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1388                                          "and ban ourself for %u seconds\n",
1389                                          ctdb->tunable.recovery_ban_period));
1390                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1391                         return -1;
1392                 }
1393                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1394                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1395         }
1396
1397         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1398
1399         /* get a list of all databases */
1400         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1401         if (ret != 0) {
1402                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1403                 return -1;
1404         }
1405
1406         /* we do the db creation before we set the recovery mode, so the freeze happens
1407            on all databases we will be dealing with. */
1408
1409         /* verify that we have all the databases any other node has */
1410         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1411         if (ret != 0) {
1412                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1413                 return -1;
1414         }
1415
1416         /* verify that all other nodes have all our databases */
1417         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1418         if (ret != 0) {
1419                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1420                 return -1;
1421         }
1422         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1423
1424         /* update the database priority for all remote databases */
1425         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1426         if (ret != 0) {
1427                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1428         }
1429         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1430
1431
1432         /* update all other nodes to use the same setting for reclock files
1433            as the local recovery master.
1434         */
1435         sync_recovery_lock_file_across_cluster(rec);
1436
1437         /* set recovery mode to active on all nodes */
1438         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1439         if (ret != 0) {
1440                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1441                 return -1;
1442         }
1443
1444         /* execute the "startrecovery" event script on all nodes */
1445         ret = run_startrecovery_eventscript(rec, nodemap);
1446         if (ret!=0) {
1447                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1448                 return -1;
1449         }
1450
1451         /*
1452           update all nodes to have the same flags that we have
1453          */
1454         for (i=0;i<nodemap->num;i++) {
1455                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1456                         continue;
1457                 }
1458
1459                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1460                 if (ret != 0) {
1461                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1462                         return -1;
1463                 }
1464         }
1465
1466         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1467
1468         /* pick a new generation number */
1469         generation = new_generation();
1470
1471         /* change the vnnmap on this node to use the new generation 
1472            number but not on any other nodes.
1473            this guarantees that if we abort the recovery prematurely
1474            for some reason (a node stops responding?)
1475            that we can just return immediately and we will reenter
1476            recovery shortly again.
1477            I.e. we deliberately leave the cluster with an inconsistent
1478            generation id to allow us to abort recovery at any stage and
1479            just restart it from scratch.
1480          */
1481         vnnmap->generation = generation;
1482         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1483         if (ret != 0) {
1484                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1485                 return -1;
1486         }
1487
1488         data.dptr = (void *)&generation;
1489         data.dsize = sizeof(uint32_t);
1490
1491         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1492         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1493                                         nodes, 0,
1494                                         CONTROL_TIMEOUT(), false, data,
1495                                         NULL,
1496                                         transaction_start_fail_callback,
1497                                         rec) != 0) {
1498                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1499                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1500                                         nodes, 0,
1501                                         CONTROL_TIMEOUT(), false, tdb_null,
1502                                         NULL,
1503                                         NULL,
1504                                         NULL) != 0) {
1505                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1506                 }
1507                 return -1;
1508         }
1509
1510         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1511
1512         for (i=0;i<dbmap->num;i++) {
1513                 ret = recover_database(rec, mem_ctx,
1514                                        dbmap->dbs[i].dbid,
1515                                        dbmap->dbs[i].persistent,
1516                                        pnn, nodemap, generation);
1517                 if (ret != 0) {
1518                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1519                         return -1;
1520                 }
1521         }
1522
1523         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1524
1525         /* commit all the changes */
1526         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1527                                         nodes, 0,
1528                                         CONTROL_TIMEOUT(), false, data,
1529                                         NULL, NULL,
1530                                         NULL) != 0) {
1531                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1532                 return -1;
1533         }
1534
1535         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1536         
1537
1538         /* update the capabilities for all nodes */
1539         ret = update_capabilities(ctdb, nodemap);
1540         if (ret!=0) {
1541                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1542                 return -1;
1543         }
1544
1545         /* build a new vnn map with all the currently active and
1546            unbanned nodes */
1547         generation = new_generation();
1548         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1549         CTDB_NO_MEMORY(ctdb, vnnmap);
1550         vnnmap->generation = generation;
1551         vnnmap->size = 0;
1552         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1553         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1554         for (i=j=0;i<nodemap->num;i++) {
1555                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1556                         continue;
1557                 }
1558                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1559                         /* this node can not be an lmaster */
1560                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1561                         continue;
1562                 }
1563
1564                 vnnmap->size++;
1565                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1566                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1567                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1568
1569         }
1570         if (vnnmap->size == 0) {
1571                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1572                 vnnmap->size++;
1573                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1574                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1575                 vnnmap->map[0] = pnn;
1576         }       
1577
1578         /* update to the new vnnmap on all nodes */
1579         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1580         if (ret != 0) {
1581                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1582                 return -1;
1583         }
1584
1585         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1586
1587         /* update recmaster to point to us for all nodes */
1588         ret = set_recovery_master(ctdb, nodemap, pnn);
1589         if (ret!=0) {
1590                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1591                 return -1;
1592         }
1593
1594         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1595
1596         /*
1597           update all nodes to have the same flags that we have
1598          */
1599         for (i=0;i<nodemap->num;i++) {
1600                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1601                         continue;
1602                 }
1603
1604                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1605                 if (ret != 0) {
1606                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1607                         return -1;
1608                 }
1609         }
1610
1611         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1612
1613         /* disable recovery mode */
1614         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1615         if (ret != 0) {
1616                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1617                 return -1;
1618         }
1619
1620         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1621
1622         /*
1623           tell nodes to takeover their public IPs
1624          */
1625         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1626         if (ret != 0) {
1627                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1628                                  culprit));
1629                 return -1;
1630         }
1631         rec->need_takeover_run = false;
1632         ret = ctdb_takeover_run(ctdb, nodemap);
1633         if (ret != 0) {
1634                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1635                 return -1;
1636         }
1637         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1638
1639         /* execute the "recovered" event script on all nodes */
1640         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1641         if (ret!=0) {
1642                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1643                 return -1;
1644         }
1645
1646         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1647
1648         /* send a message to all clients telling them that the cluster 
1649            has been reconfigured */
1650         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1651
1652         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1653
1654         rec->need_recovery = false;
1655
1656         /* we managed to complete a full recovery, make sure to forgive
1657            any past sins by the nodes that could now participate in the
1658            recovery.
1659         */
1660         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1661         for (i=0;i<nodemap->num;i++) {
1662                 struct ctdb_banning_state *ban_state;
1663
1664                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1665                         continue;
1666                 }
1667
1668                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1669                 if (ban_state == NULL) {
1670                         continue;
1671                 }
1672
1673                 ban_state->count = 0;
1674         }
1675
1676
1677         /* We just finished a recovery successfully. 
1678            We now wait for rerecovery_timeout before we allow 
1679            another recovery to take place.
1680         */
1681         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1682         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1683         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1684
1685         return 0;
1686 }
1687
1688
1689 /*
1690   elections are won by first checking the number of connected nodes, then
1691   the priority time, then the pnn
1692  */
1693 struct election_message {
1694         uint32_t num_connected;
1695         struct timeval priority_time;
1696         uint32_t pnn;
1697         uint32_t node_flags;
1698 };
1699
1700 /*
1701   form this nodes election data
1702  */
1703 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1704 {
1705         int ret, i;
1706         struct ctdb_node_map *nodemap;
1707         struct ctdb_context *ctdb = rec->ctdb;
1708
1709         ZERO_STRUCTP(em);
1710
1711         em->pnn = rec->ctdb->pnn;
1712         em->priority_time = rec->priority_time;
1713
1714         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1715         if (ret != 0) {
1716                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1717                 return;
1718         }
1719
1720         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1721         em->node_flags = rec->node_flags;
1722
1723         for (i=0;i<nodemap->num;i++) {
1724                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1725                         em->num_connected++;
1726                 }
1727         }
1728
1729         /* we shouldnt try to win this election if we cant be a recmaster */
1730         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1731                 em->num_connected = 0;
1732                 em->priority_time = timeval_current();
1733         }
1734
1735         talloc_free(nodemap);
1736 }
1737
1738 /*
1739   see if the given election data wins
1740  */
1741 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1742 {
1743         struct election_message myem;
1744         int cmp = 0;
1745
1746         ctdb_election_data(rec, &myem);
1747
1748         /* we cant win if we dont have the recmaster capability */
1749         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1750                 return false;
1751         }
1752
1753         /* we cant win if we are banned */
1754         if (rec->node_flags & NODE_FLAGS_BANNED) {
1755                 return false;
1756         }       
1757
1758         /* we cant win if we are stopped */
1759         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1760                 return false;
1761         }       
1762
1763         /* we will automatically win if the other node is banned */
1764         if (em->node_flags & NODE_FLAGS_BANNED) {
1765                 return true;
1766         }
1767
1768         /* we will automatically win if the other node is banned */
1769         if (em->node_flags & NODE_FLAGS_STOPPED) {
1770                 return true;
1771         }
1772
1773         /* try to use the most connected node */
1774         if (cmp == 0) {
1775                 cmp = (int)myem.num_connected - (int)em->num_connected;
1776         }
1777
1778         /* then the longest running node */
1779         if (cmp == 0) {
1780                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1781         }
1782
1783         if (cmp == 0) {
1784                 cmp = (int)myem.pnn - (int)em->pnn;
1785         }
1786
1787         return cmp > 0;
1788 }
1789
1790 /*
1791   send out an election request
1792  */
1793 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1794 {
1795         int ret;
1796         TDB_DATA election_data;
1797         struct election_message emsg;
1798         uint64_t srvid;
1799         struct ctdb_context *ctdb = rec->ctdb;
1800
1801         srvid = CTDB_SRVID_RECOVERY;
1802
1803         ctdb_election_data(rec, &emsg);
1804
1805         election_data.dsize = sizeof(struct election_message);
1806         election_data.dptr  = (unsigned char *)&emsg;
1807
1808
1809         /* send an election message to all active nodes */
1810         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1811         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1812
1813
1814         /* A new node that is already frozen has entered the cluster.
1815            The existing nodes are not frozen and dont need to be frozen
1816            until the election has ended and we start the actual recovery
1817         */
1818         if (update_recmaster == true) {
1819                 /* first we assume we will win the election and set 
1820                    recoverymaster to be ourself on the current node
1821                  */
1822                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1823                 if (ret != 0) {
1824                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1825                         return -1;
1826                 }
1827         }
1828
1829
1830         return 0;
1831 }
1832
1833 /*
1834   this function will unban all nodes in the cluster
1835 */
1836 static void unban_all_nodes(struct ctdb_context *ctdb)
1837 {
1838         int ret, i;
1839         struct ctdb_node_map *nodemap;
1840         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1841         
1842         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1845                 return;
1846         }
1847
1848         for (i=0;i<nodemap->num;i++) {
1849                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1850                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1851                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1852                 }
1853         }
1854
1855         talloc_free(tmp_ctx);
1856 }
1857
1858
1859 /*
1860   we think we are winning the election - send a broadcast election request
1861  */
1862 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1863 {
1864         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1865         int ret;
1866
1867         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1870         }
1871
1872         talloc_free(rec->send_election_te);
1873         rec->send_election_te = NULL;
1874 }
1875
1876 /*
1877   handler for memory dumps
1878 */
1879 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1880                              TDB_DATA data, void *private_data)
1881 {
1882         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1883         TDB_DATA *dump;
1884         int ret;
1885         struct rd_memdump_reply *rd;
1886
1887         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1888                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1889                 talloc_free(tmp_ctx);
1890                 return;
1891         }
1892         rd = (struct rd_memdump_reply *)data.dptr;
1893
1894         dump = talloc_zero(tmp_ctx, TDB_DATA);
1895         if (dump == NULL) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1897                 talloc_free(tmp_ctx);
1898                 return;
1899         }
1900         ret = ctdb_dump_memory(ctdb, dump);
1901         if (ret != 0) {
1902                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1903                 talloc_free(tmp_ctx);
1904                 return;
1905         }
1906
1907 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1908
1909         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1910         if (ret != 0) {
1911                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1912                 talloc_free(tmp_ctx);
1913                 return;
1914         }
1915
1916         talloc_free(tmp_ctx);
1917 }
1918
1919 /*
1920   handler for reload_nodes
1921 */
1922 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1923                              TDB_DATA data, void *private_data)
1924 {
1925         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1926
1927         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1928
1929         reload_nodes_file(rec->ctdb);
1930 }
1931
1932
1933 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1934                               struct timeval yt, void *p)
1935 {
1936         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1937
1938         talloc_free(rec->ip_check_disable_ctx);
1939         rec->ip_check_disable_ctx = NULL;
1940 }
1941
1942
1943 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1944                              TDB_DATA data, void *private_data)
1945 {
1946         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1947         struct ctdb_public_ip *ip;
1948
1949         if (rec->recmaster != rec->ctdb->pnn) {
1950                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1951                 return;
1952         }
1953
1954         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1955                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1956                 return;
1957         }
1958
1959         ip = (struct ctdb_public_ip *)data.dptr;
1960
1961         update_ip_assignment_tree(rec->ctdb, ip);
1962 }
1963
1964
1965 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1966                              TDB_DATA data, void *private_data)
1967 {
1968         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1969         uint32_t timeout;
1970
1971         if (rec->ip_check_disable_ctx != NULL) {
1972                 talloc_free(rec->ip_check_disable_ctx);
1973                 rec->ip_check_disable_ctx = NULL;
1974         }
1975
1976         if (data.dsize != sizeof(uint32_t)) {
1977                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1978                                  "expexting %lu\n", (long unsigned)data.dsize,
1979                                  (long unsigned)sizeof(uint32_t)));
1980                 return;
1981         }
1982         if (data.dptr == NULL) {
1983                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1984                 return;
1985         }
1986
1987         timeout = *((uint32_t *)data.dptr);
1988         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1989
1990         rec->ip_check_disable_ctx = talloc_new(rec);
1991         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1992
1993         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1994 }
1995
1996
1997 /*
1998   handler for ip reallocate, just add it to the list of callers and 
1999   handle this later in the monitor_cluster loop so we do not recurse
2000   with other callers to takeover_run()
2001 */
2002 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2003                              TDB_DATA data, void *private_data)
2004 {
2005         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2006         struct ip_reallocate_list *caller;
2007
2008         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2009                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2010                 return;
2011         }
2012
2013         if (rec->ip_reallocate_ctx == NULL) {
2014                 rec->ip_reallocate_ctx = talloc_new(rec);
2015                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2016         }
2017
2018         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2019         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2020
2021         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2022         caller->next = rec->reallocate_callers;
2023         rec->reallocate_callers = caller;
2024
2025         return;
2026 }
2027
2028 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2029 {
2030         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2031         TDB_DATA result;
2032         int32_t ret;
2033         struct ip_reallocate_list *callers;
2034         uint32_t culprit;
2035
2036         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2037
2038         /* update the list of public ips that a node can handle for
2039            all connected nodes
2040         */
2041         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2042         if (ret != 0) {
2043                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2044                                  culprit));
2045                 rec->need_takeover_run = true;
2046         }
2047         if (ret == 0) {
2048                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2049                 if (ret != 0) {
2050                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2051                                          culprit));
2052                         rec->need_takeover_run = true;
2053                 }
2054         }
2055
2056         result.dsize = sizeof(int32_t);
2057         result.dptr  = (uint8_t *)&ret;
2058
2059         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2060
2061                 /* Someone that sent srvid==0 does not want a reply */
2062                 if (callers->rd->srvid == 0) {
2063                         continue;
2064                 }
2065                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2066                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2067                                   (unsigned long long)callers->rd->srvid));
2068                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2069                 if (ret != 0) {
2070                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2071                                          "message to %u:%llu\n",
2072                                          (unsigned)callers->rd->pnn,
2073                                          (unsigned long long)callers->rd->srvid));
2074                 }
2075         }
2076
2077         talloc_free(tmp_ctx);
2078         talloc_free(rec->ip_reallocate_ctx);
2079         rec->ip_reallocate_ctx = NULL;
2080         rec->reallocate_callers = NULL;
2081         
2082 }
2083
2084
2085 /*
2086   handler for recovery master elections
2087 */
2088 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2089                              TDB_DATA data, void *private_data)
2090 {
2091         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2092         int ret;
2093         struct election_message *em = (struct election_message *)data.dptr;
2094         TALLOC_CTX *mem_ctx;
2095
2096         /* we got an election packet - update the timeout for the election */
2097         talloc_free(rec->election_timeout);
2098         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2099                                                 fast_start ?
2100                                                 timeval_current_ofs(0, 500000) :
2101                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2102                                                 ctdb_election_timeout, rec);
2103
2104         mem_ctx = talloc_new(ctdb);
2105
2106         /* someone called an election. check their election data
2107            and if we disagree and we would rather be the elected node, 
2108            send a new election message to all other nodes
2109          */
2110         if (ctdb_election_win(rec, em)) {
2111                 if (!rec->send_election_te) {
2112                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2113                                                                 timeval_current_ofs(0, 500000),
2114                                                                 election_send_request, rec);
2115                 }
2116                 talloc_free(mem_ctx);
2117                 /*unban_all_nodes(ctdb);*/
2118                 return;
2119         }
2120         
2121         /* we didn't win */
2122         talloc_free(rec->send_election_te);
2123         rec->send_election_te = NULL;
2124
2125         if (ctdb->tunable.verify_recovery_lock != 0) {
2126                 /* release the recmaster lock */
2127                 if (em->pnn != ctdb->pnn &&
2128                     ctdb->recovery_lock_fd != -1) {
2129                         close(ctdb->recovery_lock_fd);
2130                         ctdb->recovery_lock_fd = -1;
2131                         unban_all_nodes(ctdb);
2132                 }
2133         }
2134
2135         /* ok, let that guy become recmaster then */
2136         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2139                 talloc_free(mem_ctx);
2140                 return;
2141         }
2142
2143         talloc_free(mem_ctx);
2144         return;
2145 }
2146
2147
2148 /*
2149   force the start of the election process
2150  */
2151 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2152                            struct ctdb_node_map *nodemap)
2153 {
2154         int ret;
2155         struct ctdb_context *ctdb = rec->ctdb;
2156
2157         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2158
2159         /* set all nodes to recovery mode to stop all internode traffic */
2160         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2161         if (ret != 0) {
2162                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2163                 return;
2164         }
2165
2166         talloc_free(rec->election_timeout);
2167         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2168                                                 fast_start ?
2169                                                 timeval_current_ofs(0, 500000) :
2170                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2171                                                 ctdb_election_timeout, rec);
2172
2173         ret = send_election_request(rec, pnn, true);
2174         if (ret!=0) {
2175                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2176                 return;
2177         }
2178
2179         /* wait for a few seconds to collect all responses */
2180         ctdb_wait_election(rec);
2181 }
2182
2183
2184
2185 /*
2186   handler for when a node changes its flags
2187 */
2188 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2189                             TDB_DATA data, void *private_data)
2190 {
2191         int ret;
2192         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2193         struct ctdb_node_map *nodemap=NULL;
2194         TALLOC_CTX *tmp_ctx;
2195         uint32_t changed_flags;
2196         int i;
2197         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2198         int disabled_flag_changed;
2199
2200         if (data.dsize != sizeof(*c)) {
2201                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2202                 return;
2203         }
2204
2205         tmp_ctx = talloc_new(ctdb);
2206         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2207
2208         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2209         if (ret != 0) {
2210                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2211                 talloc_free(tmp_ctx);
2212                 return;         
2213         }
2214
2215
2216         for (i=0;i<nodemap->num;i++) {
2217                 if (nodemap->nodes[i].pnn == c->pnn) break;
2218         }
2219
2220         if (i == nodemap->num) {
2221                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2222                 talloc_free(tmp_ctx);
2223                 return;
2224         }
2225
2226         changed_flags = c->old_flags ^ c->new_flags;
2227
2228         if (nodemap->nodes[i].flags != c->new_flags) {
2229                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2230         }
2231
2232         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2233
2234         nodemap->nodes[i].flags = c->new_flags;
2235
2236         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2237                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2238
2239         if (ret == 0) {
2240                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2241                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2242         }
2243         
2244         if (ret == 0 &&
2245             ctdb->recovery_master == ctdb->pnn &&
2246             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2247                 /* Only do the takeover run if the perm disabled or unhealthy
2248                    flags changed since these will cause an ip failover but not
2249                    a recovery.
2250                    If the node became disconnected or banned this will also
2251                    lead to an ip address failover but that is handled 
2252                    during recovery
2253                 */
2254                 if (disabled_flag_changed) {
2255                         rec->need_takeover_run = true;
2256                 }
2257         }
2258
2259         talloc_free(tmp_ctx);
2260 }
2261
2262 /*
2263   handler for when we need to push out flag changes ot all other nodes
2264 */
2265 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2266                             TDB_DATA data, void *private_data)
2267 {
2268         int ret;
2269         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2270         struct ctdb_node_map *nodemap=NULL;
2271         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2272         uint32_t recmaster;
2273         uint32_t *nodes;
2274
2275         /* find the recovery master */
2276         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2277         if (ret != 0) {
2278                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2279                 talloc_free(tmp_ctx);
2280                 return;
2281         }
2282
2283         /* read the node flags from the recmaster */
2284         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2285         if (ret != 0) {
2286                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2287                 talloc_free(tmp_ctx);
2288                 return;
2289         }
2290         if (c->pnn >= nodemap->num) {
2291                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2292                 talloc_free(tmp_ctx);
2293                 return;
2294         }
2295
2296         /* send the flags update to all connected nodes */
2297         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2298
2299         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2300                                       nodes, 0, CONTROL_TIMEOUT(),
2301                                       false, data,
2302                                       NULL, NULL,
2303                                       NULL) != 0) {
2304                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2305
2306                 talloc_free(tmp_ctx);
2307                 return;
2308         }
2309
2310         talloc_free(tmp_ctx);
2311 }
2312
2313
2314 struct verify_recmode_normal_data {
2315         uint32_t count;
2316         enum monitor_result status;
2317 };
2318
2319 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2320 {
2321         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2322
2323
2324         /* one more node has responded with recmode data*/
2325         rmdata->count--;
2326
2327         /* if we failed to get the recmode, then return an error and let
2328            the main loop try again.
2329         */
2330         if (state->state != CTDB_CONTROL_DONE) {
2331                 if (rmdata->status == MONITOR_OK) {
2332                         rmdata->status = MONITOR_FAILED;
2333                 }
2334                 return;
2335         }
2336
2337         /* if we got a response, then the recmode will be stored in the
2338            status field
2339         */
2340         if (state->status != CTDB_RECOVERY_NORMAL) {
2341                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2342                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2343         }
2344
2345         return;
2346 }
2347
2348
2349 /* verify that all nodes are in normal recovery mode */
2350 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2351 {
2352         struct verify_recmode_normal_data *rmdata;
2353         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2354         struct ctdb_client_control_state *state;
2355         enum monitor_result status;
2356         int j;
2357         
2358         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2359         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2360         rmdata->count  = 0;
2361         rmdata->status = MONITOR_OK;
2362
2363         /* loop over all active nodes and send an async getrecmode call to 
2364            them*/
2365         for (j=0; j<nodemap->num; j++) {
2366                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2367                         continue;
2368                 }
2369                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2370                                         CONTROL_TIMEOUT(), 
2371                                         nodemap->nodes[j].pnn);
2372                 if (state == NULL) {
2373                         /* we failed to send the control, treat this as 
2374                            an error and try again next iteration
2375                         */                      
2376                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2377                         talloc_free(mem_ctx);
2378                         return MONITOR_FAILED;
2379                 }
2380
2381                 /* set up the callback functions */
2382                 state->async.fn = verify_recmode_normal_callback;
2383                 state->async.private_data = rmdata;
2384
2385                 /* one more control to wait for to complete */
2386                 rmdata->count++;
2387         }
2388
2389
2390         /* now wait for up to the maximum number of seconds allowed
2391            or until all nodes we expect a response from has replied
2392         */
2393         while (rmdata->count > 0) {
2394                 event_loop_once(ctdb->ev);
2395         }
2396
2397         status = rmdata->status;
2398         talloc_free(mem_ctx);
2399         return status;
2400 }
2401
2402
2403 struct verify_recmaster_data {
2404         struct ctdb_recoverd *rec;
2405         uint32_t count;
2406         uint32_t pnn;
2407         enum monitor_result status;
2408 };
2409
2410 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2411 {
2412         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2413
2414
2415         /* one more node has responded with recmaster data*/
2416         rmdata->count--;
2417
2418         /* if we failed to get the recmaster, then return an error and let
2419            the main loop try again.
2420         */
2421         if (state->state != CTDB_CONTROL_DONE) {
2422                 if (rmdata->status == MONITOR_OK) {
2423                         rmdata->status = MONITOR_FAILED;
2424                 }
2425                 return;
2426         }
2427
2428         /* if we got a response, then the recmaster will be stored in the
2429            status field
2430         */
2431         if (state->status != rmdata->pnn) {
2432                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2433                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2434                 rmdata->status = MONITOR_ELECTION_NEEDED;
2435         }
2436
2437         return;
2438 }
2439
2440
2441 /* verify that all nodes agree that we are the recmaster */
2442 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2443 {
2444         struct ctdb_context *ctdb = rec->ctdb;
2445         struct verify_recmaster_data *rmdata;
2446         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2447         struct ctdb_client_control_state *state;
2448         enum monitor_result status;
2449         int j;
2450         
2451         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2452         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2453         rmdata->rec    = rec;
2454         rmdata->count  = 0;
2455         rmdata->pnn    = pnn;
2456         rmdata->status = MONITOR_OK;
2457
2458         /* loop over all active nodes and send an async getrecmaster call to 
2459            them*/
2460         for (j=0; j<nodemap->num; j++) {
2461                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2462                         continue;
2463                 }
2464                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2465                                         CONTROL_TIMEOUT(),
2466                                         nodemap->nodes[j].pnn);
2467                 if (state == NULL) {
2468                         /* we failed to send the control, treat this as 
2469                            an error and try again next iteration
2470                         */                      
2471                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2472                         talloc_free(mem_ctx);
2473                         return MONITOR_FAILED;
2474                 }
2475
2476                 /* set up the callback functions */
2477                 state->async.fn = verify_recmaster_callback;
2478                 state->async.private_data = rmdata;
2479
2480                 /* one more control to wait for to complete */
2481                 rmdata->count++;
2482         }
2483
2484
2485         /* now wait for up to the maximum number of seconds allowed
2486            or until all nodes we expect a response from has replied
2487         */
2488         while (rmdata->count > 0) {
2489                 event_loop_once(ctdb->ev);
2490         }
2491
2492         status = rmdata->status;
2493         talloc_free(mem_ctx);
2494         return status;
2495 }
2496
2497
2498 /* called to check that the local allocation of public ip addresses is ok.
2499 */
2500 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2501 {
2502         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2503         struct ctdb_control_get_ifaces *ifaces = NULL;
2504         struct ctdb_all_public_ips *ips = NULL;
2505         struct ctdb_uptime *uptime1 = NULL;
2506         struct ctdb_uptime *uptime2 = NULL;
2507         int ret, j;
2508         bool need_iface_check = false;
2509         bool need_takeover_run = false;
2510
2511         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2512                                 CTDB_CURRENT_NODE, &uptime1);
2513         if (ret != 0) {
2514                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2515                 talloc_free(mem_ctx);
2516                 return -1;
2517         }
2518
2519
2520         /* read the interfaces from the local node */
2521         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2522         if (ret != 0) {
2523                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2524                 talloc_free(mem_ctx);
2525                 return -1;
2526         }
2527
2528         if (!rec->ifaces) {
2529                 need_iface_check = true;
2530         } else if (rec->ifaces->num != ifaces->num) {
2531                 need_iface_check = true;
2532         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2533                 need_iface_check = true;
2534         }
2535
2536         if (need_iface_check) {
2537                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2538                                      "local node %u - force takeover run\n",
2539                                      pnn));
2540                 need_takeover_run = true;
2541         }
2542
2543         /* read the ip allocation from the local node */
2544         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2545         if (ret != 0) {
2546                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2547                 talloc_free(mem_ctx);
2548                 return -1;
2549         }
2550
2551         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2552                                 CTDB_CURRENT_NODE, &uptime2);
2553         if (ret != 0) {
2554                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2555                 talloc_free(mem_ctx);
2556                 return -1;
2557         }
2558
2559         /* skip the check if the startrecovery time has changed */
2560         if (timeval_compare(&uptime1->last_recovery_started,
2561                             &uptime2->last_recovery_started) != 0) {
2562                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2563                 talloc_free(mem_ctx);
2564                 return 0;
2565         }
2566
2567         /* skip the check if the endrecovery time has changed */
2568         if (timeval_compare(&uptime1->last_recovery_finished,
2569                             &uptime2->last_recovery_finished) != 0) {
2570                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2571                 talloc_free(mem_ctx);
2572                 return 0;
2573         }
2574
2575         /* skip the check if we have started but not finished recovery */
2576         if (timeval_compare(&uptime1->last_recovery_finished,
2577                             &uptime1->last_recovery_started) != 1) {
2578                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2579                 talloc_free(mem_ctx);
2580
2581                 return 0;
2582         }
2583
2584         talloc_free(rec->ifaces);
2585         rec->ifaces = talloc_steal(rec, ifaces);
2586
2587         /* verify that we have the ip addresses we should have
2588            and we dont have ones we shouldnt have.
2589            if we find an inconsistency we set recmode to
2590            active on the local node and wait for the recmaster
2591            to do a full blown recovery
2592         */
2593         if (ctdb->tunable.disable_ip_failover == 0) {
2594                 for (j=0; j<ips->num; j++) {
2595                         if (ips->ips[j].pnn == pnn) {
2596                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2597                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2598                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2599                                         need_takeover_run = true;
2600                                 }
2601                         } else {
2602                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2603                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2604                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2605                                         need_takeover_run = true;
2606                                 }
2607                         }
2608                 }
2609         }
2610
2611         if (need_takeover_run) {
2612                 struct takeover_run_reply rd;
2613                 TDB_DATA data;
2614
2615                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2616
2617                 rd.pnn = ctdb->pnn;
2618                 rd.srvid = 0;
2619                 data.dptr = (uint8_t *)&rd;
2620                 data.dsize = sizeof(rd);
2621
2622                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2623                 if (ret != 0) {
2624                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2625                 }
2626         }
2627         talloc_free(mem_ctx);
2628         return 0;
2629 }
2630
2631
2632 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2633 {
2634         struct ctdb_node_map **remote_nodemaps = callback_data;
2635
2636         if (node_pnn >= ctdb->num_nodes) {
2637                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2638                 return;
2639         }
2640
2641         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2642
2643 }
2644
2645 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2646         struct ctdb_node_map *nodemap,
2647         struct ctdb_node_map **remote_nodemaps)
2648 {
2649         uint32_t *nodes;
2650
2651         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2652         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2653                                         nodes, 0,
2654                                         CONTROL_TIMEOUT(), false, tdb_null,
2655                                         async_getnodemap_callback,
2656                                         NULL,
2657                                         remote_nodemaps) != 0) {
2658                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2659
2660                 return -1;
2661         }
2662
2663         return 0;
2664 }
2665
2666 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2667 struct ctdb_check_reclock_state {
2668         struct ctdb_context *ctdb;
2669         struct timeval start_time;
2670         int fd[2];
2671         pid_t child;
2672         struct timed_event *te;
2673         struct fd_event *fde;
2674         enum reclock_child_status status;
2675 };
2676
2677 /* when we free the reclock state we must kill any child process.
2678 */
2679 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2680 {
2681         struct ctdb_context *ctdb = state->ctdb;
2682
2683         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2684
2685         if (state->fd[0] != -1) {
2686                 close(state->fd[0]);
2687                 state->fd[0] = -1;
2688         }
2689         if (state->fd[1] != -1) {
2690                 close(state->fd[1]);
2691                 state->fd[1] = -1;
2692         }
2693         kill(state->child, SIGKILL);
2694         return 0;
2695 }
2696
2697 /*
2698   called if our check_reclock child times out. this would happen if
2699   i/o to the reclock file blocks.
2700  */
2701 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2702                                          struct timeval t, void *private_data)
2703 {
2704         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2705                                            struct ctdb_check_reclock_state);
2706
2707         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2708         state->status = RECLOCK_TIMEOUT;
2709 }
2710
2711 /* this is called when the child process has completed checking the reclock
2712    file and has written data back to us through the pipe.
2713 */
2714 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2715                              uint16_t flags, void *private_data)
2716 {
2717         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2718                                              struct ctdb_check_reclock_state);
2719         char c = 0;
2720         int ret;
2721
2722         /* we got a response from our child process so we can abort the
2723            timeout.
2724         */
2725         talloc_free(state->te);
2726         state->te = NULL;
2727
2728         ret = read(state->fd[0], &c, 1);
2729         if (ret != 1 || c != RECLOCK_OK) {
2730                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2731                 state->status = RECLOCK_FAILED;
2732
2733                 return;
2734         }
2735
2736         state->status = RECLOCK_OK;
2737         return;
2738 }
2739
2740 static int check_recovery_lock(struct ctdb_context *ctdb)
2741 {
2742         int ret;
2743         struct ctdb_check_reclock_state *state;
2744         pid_t parent = getpid();
2745
2746         if (ctdb->recovery_lock_fd == -1) {
2747                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2748                 return -1;
2749         }
2750
2751         state = talloc(ctdb, struct ctdb_check_reclock_state);
2752         CTDB_NO_MEMORY(ctdb, state);
2753
2754         state->ctdb = ctdb;
2755         state->start_time = timeval_current();
2756         state->status = RECLOCK_CHECKING;
2757         state->fd[0] = -1;
2758         state->fd[1] = -1;
2759
2760         ret = pipe(state->fd);
2761         if (ret != 0) {
2762                 talloc_free(state);
2763                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2764                 return -1;
2765         }
2766
2767         state->child = fork();
2768         if (state->child == (pid_t)-1) {
2769                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2770                 close(state->fd[0]);
2771                 state->fd[0] = -1;
2772                 close(state->fd[1]);
2773                 state->fd[1] = -1;
2774                 talloc_free(state);
2775                 return -1;
2776         }
2777
2778         if (state->child == 0) {
2779                 char cc = RECLOCK_OK;
2780                 close(state->fd[0]);
2781                 state->fd[0] = -1;
2782
2783                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2784                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2785                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2786                         cc = RECLOCK_FAILED;
2787                 }
2788
2789                 write(state->fd[1], &cc, 1);
2790                 /* make sure we die when our parent dies */
2791                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2792                         sleep(5);
2793                         write(state->fd[1], &cc, 1);
2794                 }
2795                 _exit(0);
2796         }
2797         close(state->fd[1]);
2798         state->fd[1] = -1;
2799         set_close_on_exec(state->fd[0]);
2800
2801         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2802
2803         talloc_set_destructor(state, check_reclock_destructor);
2804
2805         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2806                                     ctdb_check_reclock_timeout, state);
2807         if (state->te == NULL) {
2808                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2809                 talloc_free(state);
2810                 return -1;
2811         }
2812
2813         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2814                                 EVENT_FD_READ,
2815                                 reclock_child_handler,
2816                                 (void *)state);
2817
2818         if (state->fde == NULL) {
2819                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2820                 talloc_free(state);
2821                 return -1;
2822         }
2823         tevent_fd_set_auto_close(state->fde);
2824
2825         while (state->status == RECLOCK_CHECKING) {
2826                 event_loop_once(ctdb->ev);
2827         }
2828
2829         if (state->status == RECLOCK_FAILED) {
2830                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2831                 close(ctdb->recovery_lock_fd);
2832                 ctdb->recovery_lock_fd = -1;
2833                 talloc_free(state);
2834                 return -1;
2835         }
2836
2837         talloc_free(state);
2838         return 0;
2839 }
2840
2841 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2842 {
2843         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2844         const char *reclockfile;
2845
2846         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2847                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2848                 talloc_free(tmp_ctx);
2849                 return -1;      
2850         }
2851
2852         if (reclockfile == NULL) {
2853                 if (ctdb->recovery_lock_file != NULL) {
2854                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2855                         talloc_free(ctdb->recovery_lock_file);
2856                         ctdb->recovery_lock_file = NULL;
2857                         if (ctdb->recovery_lock_fd != -1) {
2858                                 close(ctdb->recovery_lock_fd);
2859                                 ctdb->recovery_lock_fd = -1;
2860                         }
2861                 }
2862                 ctdb->tunable.verify_recovery_lock = 0;
2863                 talloc_free(tmp_ctx);
2864                 return 0;
2865         }
2866
2867         if (ctdb->recovery_lock_file == NULL) {
2868                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2869                 if (ctdb->recovery_lock_fd != -1) {
2870                         close(ctdb->recovery_lock_fd);
2871                         ctdb->recovery_lock_fd = -1;
2872                 }
2873                 talloc_free(tmp_ctx);
2874                 return 0;
2875         }
2876
2877
2878         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2879                 talloc_free(tmp_ctx);
2880                 return 0;
2881         }
2882
2883         talloc_free(ctdb->recovery_lock_file);
2884         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2885         ctdb->tunable.verify_recovery_lock = 0;
2886         if (ctdb->recovery_lock_fd != -1) {
2887                 close(ctdb->recovery_lock_fd);
2888                 ctdb->recovery_lock_fd = -1;
2889         }
2890
2891         talloc_free(tmp_ctx);
2892         return 0;
2893 }
2894
2895 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2896                       TALLOC_CTX *mem_ctx)
2897 {
2898         uint32_t pnn;
2899         struct ctdb_node_map *nodemap=NULL;
2900         struct ctdb_node_map *recmaster_nodemap=NULL;
2901         struct ctdb_node_map **remote_nodemaps=NULL;
2902         struct ctdb_vnn_map *vnnmap=NULL;
2903         struct ctdb_vnn_map *remote_vnnmap=NULL;
2904         int32_t debug_level;
2905         int i, j, ret;
2906
2907
2908
2909         /* verify that the main daemon is still running */
2910         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2911                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2912                 exit(-1);
2913         }
2914
2915         /* ping the local daemon to tell it we are alive */
2916         ctdb_ctrl_recd_ping(ctdb);
2917
2918         if (rec->election_timeout) {
2919                 /* an election is in progress */
2920                 return;
2921         }
2922
2923         /* read the debug level from the parent and update locally */
2924         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2925         if (ret !=0) {
2926                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2927                 return;
2928         }
2929         LogLevel = debug_level;
2930
2931
2932         /* We must check if we need to ban a node here but we want to do this
2933            as early as possible so we dont wait until we have pulled the node
2934            map from the local node. thats why we have the hardcoded value 20
2935         */
2936         for (i=0; i<ctdb->num_nodes; i++) {
2937                 struct ctdb_banning_state *ban_state;
2938
2939                 if (ctdb->nodes[i]->ban_state == NULL) {
2940                         continue;
2941                 }
2942                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2943                 if (ban_state->count < 20) {
2944                         continue;
2945                 }
2946                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2947                         ctdb->nodes[i]->pnn, ban_state->count,
2948                         ctdb->tunable.recovery_ban_period));
2949                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2950                 ban_state->count = 0;
2951         }
2952
2953         /* get relevant tunables */
2954         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2955         if (ret != 0) {
2956                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2957                 return;
2958         }
2959
2960         /* get the current recovery lock file from the server */
2961         if (update_recovery_lock_file(ctdb) != 0) {
2962                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2963                 return;
2964         }
2965
2966         /* Make sure that if recovery lock verification becomes disabled when
2967            we close the file
2968         */
2969         if (ctdb->tunable.verify_recovery_lock == 0) {
2970                 if (ctdb->recovery_lock_fd != -1) {
2971                         close(ctdb->recovery_lock_fd);
2972                         ctdb->recovery_lock_fd = -1;
2973                 }
2974         }
2975
2976         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2977         if (pnn == (uint32_t)-1) {
2978                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2979                 return;
2980         }
2981
2982         /* get the vnnmap */
2983         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2984         if (ret != 0) {
2985                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2986                 return;
2987         }
2988
2989
2990         /* get number of nodes */
2991         if (rec->nodemap) {
2992                 talloc_free(rec->nodemap);
2993                 rec->nodemap = NULL;
2994                 nodemap=NULL;
2995         }
2996         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2997         if (ret != 0) {
2998                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2999                 return;
3000         }
3001         nodemap = rec->nodemap;
3002
3003         /* check which node is the recovery master */
3004         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3005         if (ret != 0) {
3006                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3007                 return;
3008         }
3009
3010         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3011         if (rec->recmaster != pnn) {
3012                 if (rec->ip_reallocate_ctx != NULL) {
3013                         talloc_free(rec->ip_reallocate_ctx);
3014                         rec->ip_reallocate_ctx = NULL;
3015                         rec->reallocate_callers = NULL;
3016                 }
3017         }
3018
3019         if (rec->recmaster == (uint32_t)-1) {
3020                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3021                 force_election(rec, pnn, nodemap);
3022                 return;
3023         }
3024
3025
3026         /* if the local daemon is STOPPED, we verify that the databases are
3027            also frozen and thet the recmode is set to active 
3028         */
3029         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3030                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3031                 if (ret != 0) {
3032                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3033                 }
3034                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3035                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3036
3037                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3038                         if (ret != 0) {
3039                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3040                                 return;
3041                         }
3042                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3043                         if (ret != 0) {
3044                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3045
3046                                 return;
3047                         }
3048                         return;
3049                 }
3050         }
3051         /* If the local node is stopped, verify we are not the recmaster 
3052            and yield this role if so
3053         */
3054         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3055                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3056                 force_election(rec, pnn, nodemap);
3057                 return;
3058         }
3059         
3060         /* check that we (recovery daemon) and the local ctdb daemon
3061            agrees on whether we are banned or not
3062         */
3063 //qqq
3064
3065         /* remember our own node flags */
3066         rec->node_flags = nodemap->nodes[pnn].flags;
3067
3068         /* count how many active nodes there are */
3069         rec->num_active    = 0;
3070         rec->num_connected = 0;
3071         for (i=0; i<nodemap->num; i++) {
3072                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3073                         rec->num_active++;
3074                 }
3075                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3076                         rec->num_connected++;
3077                 }
3078         }
3079
3080
3081         /* verify that the recmaster node is still active */
3082         for (j=0; j<nodemap->num; j++) {
3083                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3084                         break;
3085                 }
3086         }
3087
3088         if (j == nodemap->num) {
3089                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3090                 force_election(rec, pnn, nodemap);
3091                 return;
3092         }
3093
3094         /* if recovery master is disconnected we must elect a new recmaster */
3095         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3096                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3097                 force_election(rec, pnn, nodemap);
3098                 return;
3099         }
3100
3101         /* grap the nodemap from the recovery master to check if it is banned */
3102         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3103                                    mem_ctx, &recmaster_nodemap);
3104         if (ret != 0) {
3105                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3106                           nodemap->nodes[j].pnn));
3107                 return;
3108         }
3109
3110
3111         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3112                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3113                 force_election(rec, pnn, nodemap);
3114                 return;
3115         }
3116
3117
3118         /* verify that we have all ip addresses we should have and we dont
3119          * have addresses we shouldnt have.
3120          */ 
3121         if (ctdb->tunable.disable_ip_failover == 0) {
3122                 if (rec->ip_check_disable_ctx == NULL) {
3123                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3124                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3125                         }
3126                 }
3127         }
3128
3129
3130         /* if we are not the recmaster then we do not need to check
3131            if recovery is needed
3132          */
3133         if (pnn != rec->recmaster) {
3134                 return;
3135         }
3136
3137
3138         /* ensure our local copies of flags are right */
3139         ret = update_local_flags(rec, nodemap);
3140         if (ret == MONITOR_ELECTION_NEEDED) {
3141                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3142                 force_election(rec, pnn, nodemap);
3143                 return;
3144         }
3145         if (ret != MONITOR_OK) {
3146                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3147                 return;
3148         }
3149
3150         if (ctdb->num_nodes != nodemap->num) {
3151                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3152                 reload_nodes_file(ctdb);
3153                 return;
3154         }
3155
3156         /* verify that all active nodes agree that we are the recmaster */
3157         switch (verify_recmaster(rec, nodemap, pnn)) {
3158         case MONITOR_RECOVERY_NEEDED:
3159                 /* can not happen */
3160                 return;
3161         case MONITOR_ELECTION_NEEDED:
3162                 force_election(rec, pnn, nodemap);
3163                 return;
3164         case MONITOR_OK:
3165                 break;
3166         case MONITOR_FAILED:
3167                 return;
3168         }
3169
3170
3171         if (rec->need_recovery) {
3172                 /* a previous recovery didn't finish */
3173                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3174                 return;
3175         }
3176
3177         /* verify that all active nodes are in normal mode 
3178            and not in recovery mode 
3179         */
3180         switch (verify_recmode(ctdb, nodemap)) {
3181         case MONITOR_RECOVERY_NEEDED:
3182                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3183                 return;
3184         case MONITOR_FAILED:
3185                 return;
3186         case MONITOR_ELECTION_NEEDED:
3187                 /* can not happen */
3188         case MONITOR_OK:
3189                 break;
3190         }
3191
3192
3193         if (ctdb->tunable.verify_recovery_lock != 0) {
3194                 /* we should have the reclock - check its not stale */
3195                 ret = check_recovery_lock(ctdb);
3196                 if (ret != 0) {
3197                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3198                         ctdb_set_culprit(rec, ctdb->pnn);
3199                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3200                         return;
3201                 }
3202         }
3203
3204         /* if there are takeovers requested, perform it and notify the waiters */
3205         if (rec->reallocate_callers) {
3206                 process_ipreallocate_requests(ctdb, rec);
3207         }
3208
3209         /* get the nodemap for all active remote nodes
3210          */
3211         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3212         if (remote_nodemaps == NULL) {
3213                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3214                 return;
3215         }
3216         for(i=0; i<nodemap->num; i++) {
3217                 remote_nodemaps[i] = NULL;
3218         }
3219         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3220                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3221                 return;
3222         } 
3223
3224         /* verify that all other nodes have the same nodemap as we have
3225         */
3226         for (j=0; j<nodemap->num; j++) {
3227                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3228                         continue;
3229                 }
3230
3231                 if (remote_nodemaps[j] == NULL) {
3232                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3233                         ctdb_set_culprit(rec, j);
3234
3235                         return;
3236                 }
3237
3238                 /* if the nodes disagree on how many nodes there are
3239                    then this is a good reason to try recovery
3240                  */
3241                 if (remote_nodemaps[j]->num != nodemap->num) {
3242                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3243                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3244                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3245                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3246                         return;
3247                 }
3248
3249                 /* if the nodes disagree on which nodes exist and are
3250                    active, then that is also a good reason to do recovery
3251                  */
3252                 for (i=0;i<nodemap->num;i++) {
3253                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3254                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3255                                           nodemap->nodes[j].pnn, i, 
3256                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3257                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3258                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3259                                             vnnmap);
3260                                 return;
3261                         }
3262                 }
3263
3264                 /* verify the flags are consistent
3265                 */
3266                 for (i=0; i<nodemap->num; i++) {
3267                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3268                                 continue;
3269                         }
3270                         
3271                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3272                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3273                                   nodemap->nodes[j].pnn, 
3274                                   nodemap->nodes[i].pnn, 
3275                                   remote_nodemaps[j]->nodes[i].flags,
3276                                   nodemap->nodes[j].flags));
3277                                 if (i == j) {
3278                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3279                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3280                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3281                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3282                                                     vnnmap);
3283                                         return;
3284                                 } else {
3285                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3286                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3287                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3288                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3289                                                     vnnmap);
3290                                         return;
3291                                 }
3292                         }
3293                 }
3294         }
3295
3296
3297         /* there better be the same number of lmasters in the vnn map
3298            as there are active nodes or we will have to do a recovery
3299          */
3300         if (vnnmap->size != rec->num_active) {
3301                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3302                           vnnmap->size, rec->num_active));
3303                 ctdb_set_culprit(rec, ctdb->pnn);
3304                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3305                 return;
3306         }
3307
3308         /* verify that all active nodes in the nodemap also exist in 
3309            the vnnmap.
3310          */
3311         for (j=0; j<nodemap->num; j++) {
3312                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3313                         continue;
3314                 }
3315                 if (nodemap->nodes[j].pnn == pnn) {
3316                         continue;
3317                 }
3318
3319                 for (i=0; i<vnnmap->size; i++) {
3320                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3321                                 break;
3322                         }
3323                 }
3324                 if (i == vnnmap->size) {
3325                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3326                                   nodemap->nodes[j].pnn));
3327                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3328                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3329                         return;
3330                 }
3331         }
3332
3333         
3334         /* verify that all other nodes have the same vnnmap
3335            and are from the same generation
3336          */
3337         for (j=0; j<nodemap->num; j++) {
3338                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3339                         continue;
3340                 }
3341                 if (nodemap->nodes[j].pnn == pnn) {
3342                         continue;
3343                 }
3344
3345                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3346                                           mem_ctx, &remote_vnnmap);
3347                 if (ret != 0) {
3348                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3349                                   nodemap->nodes[j].pnn));
3350                         return;
3351                 }
3352
3353                 /* verify the vnnmap generation is the same */
3354                 if (vnnmap->generation != remote_vnnmap->generation) {
3355                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3356                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3357                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3358                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3359                         return;
3360                 }
3361
3362                 /* verify the vnnmap size is the same */
3363                 if (vnnmap->size != remote_vnnmap->size) {
3364                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3365                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3366                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3367                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3368                         return;
3369                 }
3370
3371                 /* verify the vnnmap is the same */
3372                 for (i=0;i<vnnmap->size;i++) {
3373                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3374                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3375                                           nodemap->nodes[j].pnn));
3376                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3377                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3378                                             vnnmap);
3379                                 return;
3380                         }
3381                 }
3382         }
3383
3384         /* we might need to change who has what IP assigned */
3385         if (rec->need_takeover_run) {
3386                 uint32_t culprit = (uint32_t)-1;
3387
3388                 rec->need_takeover_run = false;
3389
3390                 /* update the list of public ips that a node can handle for
3391                    all connected nodes
3392                 */
3393                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3394                 if (ret != 0) {
3395                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3396                                          culprit));
3397                         ctdb_set_culprit(rec, culprit);
3398                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3399                         return;
3400                 }
3401
3402                 /* execute the "startrecovery" event script on all nodes */
3403                 ret = run_startrecovery_eventscript(rec, nodemap);
3404                 if (ret!=0) {
3405                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3406                         ctdb_set_culprit(rec, ctdb->pnn);
3407                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3408                         return;
3409                 }
3410
3411                 ret = ctdb_takeover_run(ctdb, nodemap);
3412                 if (ret != 0) {
3413                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3414                         ctdb_set_culprit(rec, ctdb->pnn);
3415                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3416                         return;
3417                 }
3418
3419                 /* execute the "recovered" event script on all nodes */
3420                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3421 #if 0
3422 // we cant check whether the event completed successfully
3423 // since this script WILL fail if the node is in recovery mode
3424 // and if that race happens, the code here would just cause a second
3425 // cascading recovery.
3426                 if (ret!=0) {
3427                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3428                         ctdb_set_culprit(rec, ctdb->pnn);
3429                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3430                 }
3431 #endif
3432         }
3433 }
3434
3435 /*
3436   the main monitoring loop
3437  */
3438 static void monitor_cluster(struct ctdb_context *ctdb)
3439 {
3440         struct ctdb_recoverd *rec;
3441
3442         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3443
3444         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3445         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3446
3447         rec->ctdb = ctdb;
3448
3449         rec->priority_time = timeval_current();
3450
3451         /* register a message port for sending memory dumps */
3452         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3453
3454         /* register a message port for recovery elections */
3455         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3456
3457         /* when nodes are disabled/enabled */
3458         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3459
3460         /* when we are asked to puch out a flag change */
3461         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3462
3463         /* register a message port for vacuum fetch */
3464         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3465
3466         /* register a message port for reloadnodes  */
3467         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3468
3469         /* register a message port for performing a takeover run */
3470         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3471
3472         /* register a message port for disabling the ip check for a short while */
3473         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3474
3475         /* register a message port for updating the recovery daemons node assignment for an ip */
3476         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3477
3478         for (;;) {
3479                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3480                 struct timeval start;
3481                 double elapsed;
3482
3483                 if (!mem_ctx) {
3484                         DEBUG(DEBUG_CRIT,(__location__
3485                                           " Failed to create temp context\n"));
3486                         exit(-1);
3487                 }
3488
3489                 start = timeval_current();
3490                 main_loop(ctdb, rec, mem_ctx);
3491                 talloc_free(mem_ctx);
3492
3493                 /* we only check for recovery once every second */
3494                 elapsed = timeval_elapsed(&start);
3495                 if (elapsed < ctdb->tunable.recover_interval) {
3496                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3497                                           - elapsed);
3498                 }
3499         }
3500 }
3501
3502 /*
3503   event handler for when the main ctdbd dies
3504  */
3505 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3506                                  uint16_t flags, void *private_data)
3507 {
3508         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3509         _exit(1);
3510 }
3511
3512 /*
3513   called regularly to verify that the recovery daemon is still running
3514  */
3515 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3516                               struct timeval yt, void *p)
3517 {
3518         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3519
3520         if (kill(ctdb->recoverd_pid, 0) != 0) {
3521                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3522
3523                 ctdb_stop_recoverd(ctdb);
3524                 ctdb_stop_keepalive(ctdb);
3525                 ctdb_stop_monitoring(ctdb);
3526                 ctdb_release_all_ips(ctdb);
3527                 if (ctdb->methods != NULL) {
3528                         ctdb->methods->shutdown(ctdb);
3529                 }
3530                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3531
3532                 exit(10);       
3533         }
3534
3535         event_add_timed(ctdb->ev, ctdb, 
3536                         timeval_current_ofs(30, 0),
3537                         ctdb_check_recd, ctdb);
3538 }
3539
3540 static void recd_sig_child_handler(struct event_context *ev,
3541         struct signal_event *se, int signum, int count,
3542         void *dont_care, 
3543         void *private_data)
3544 {
3545 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3546         int status;
3547         pid_t pid = -1;
3548
3549         while (pid != 0) {
3550                 pid = waitpid(-1, &status, WNOHANG);
3551                 if (pid == -1) {
3552                         if (errno != ECHILD) {
3553                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3554                         }
3555                         return;
3556                 }
3557                 if (pid > 0) {
3558                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3559                 }
3560         }
3561 }
3562
3563 /*
3564   startup the recovery daemon as a child of the main ctdb daemon
3565  */
3566 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3567 {
3568         int fd[2];
3569         struct signal_event *se;
3570         struct tevent_fd *fde;
3571
3572         if (pipe(fd) != 0) {
3573                 return -1;
3574         }
3575
3576         ctdb->ctdbd_pid = getpid();
3577
3578         ctdb->recoverd_pid = fork();
3579         if (ctdb->recoverd_pid == -1) {
3580                 return -1;
3581         }
3582         
3583         if (ctdb->recoverd_pid != 0) {
3584                 close(fd[0]);
3585                 event_add_timed(ctdb->ev, ctdb, 
3586                                 timeval_current_ofs(30, 0),
3587                                 ctdb_check_recd, ctdb);
3588                 return 0;
3589         }
3590
3591         close(fd[1]);
3592
3593         srandom(getpid() ^ time(NULL));
3594
3595         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3596                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3597                 exit(1);
3598         }
3599
3600         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3601
3602         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3603                      ctdb_recoverd_parent, &fd[0]);     
3604         tevent_fd_set_auto_close(fde);
3605
3606         /* set up a handler to pick up sigchld */
3607         se = event_add_signal(ctdb->ev, ctdb,
3608                                      SIGCHLD, 0,
3609                                      recd_sig_child_handler,
3610                                      ctdb);
3611         if (se == NULL) {
3612                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3613                 exit(1);
3614         }
3615
3616         monitor_cluster(ctdb);
3617
3618         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3619         return -1;
3620 }
3621
3622 /*
3623   shutdown the recovery daemon
3624  */
3625 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3626 {
3627         if (ctdb->recoverd_pid == 0) {
3628                 return;
3629         }
3630
3631         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3632         kill(ctdb->recoverd_pid, SIGTERM);
3633 }