during ip allocation, there are failure modes where a node might hold a ip address
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (ctdb->tunable.disable_ip_failover == 0) {
1281                         if (rec->ip_check_disable_ctx == NULL) {
1282                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1283                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1284                                         rec->need_takeover_run = true;
1285                                 }
1286                         }
1287                 }
1288
1289                 /* grab a new shiny list of public ips from the node */
1290                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1291                                         CONTROL_TIMEOUT(),
1292                                         ctdb->nodes[j]->pnn,
1293                                         ctdb->nodes,
1294                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1295                                         &ctdb->nodes[j]->available_public_ips);
1296                 if (ret != 0) {
1297                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1298                                 ctdb->nodes[j]->pnn));
1299                         if (culprit) {
1300                                 *culprit = ctdb->nodes[j]->pnn;
1301                         }
1302                         return -1;
1303                 }
1304         }
1305
1306         return 0;
1307 }
1308
1309 /* when we start a recovery, make sure all nodes use the same reclock file
1310    setting
1311 */
1312 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1313 {
1314         struct ctdb_context *ctdb = rec->ctdb;
1315         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1316         TDB_DATA data;
1317         uint32_t *nodes;
1318
1319         if (ctdb->recovery_lock_file == NULL) {
1320                 data.dptr  = NULL;
1321                 data.dsize = 0;
1322         } else {
1323                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1324                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1325         }
1326
1327         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1329                                         nodes, 0,
1330                                         CONTROL_TIMEOUT(),
1331                                         false, data,
1332                                         NULL, NULL,
1333                                         rec) != 0) {
1334                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1335                 talloc_free(tmp_ctx);
1336                 return -1;
1337         }
1338
1339         talloc_free(tmp_ctx);
1340         return 0;
1341 }
1342
1343
1344 /*
1345   we are the recmaster, and recovery is needed - start a recovery run
1346  */
1347 static int do_recovery(struct ctdb_recoverd *rec, 
1348                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1349                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1350 {
1351         struct ctdb_context *ctdb = rec->ctdb;
1352         int i, j, ret;
1353         uint32_t generation;
1354         struct ctdb_dbid_map *dbmap;
1355         TDB_DATA data;
1356         uint32_t *nodes;
1357         struct timeval start_time;
1358         uint32_t culprit = (uint32_t)-1;
1359
1360         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1361
1362         /* if recovery fails, force it again */
1363         rec->need_recovery = true;
1364
1365         for (i=0; i<ctdb->num_nodes; i++) {
1366                 struct ctdb_banning_state *ban_state;
1367
1368                 if (ctdb->nodes[i]->ban_state == NULL) {
1369                         continue;
1370                 }
1371                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1372                 if (ban_state->count < 2*ctdb->num_nodes) {
1373                         continue;
1374                 }
1375                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1376                         ctdb->nodes[i]->pnn, ban_state->count,
1377                         ctdb->tunable.recovery_ban_period));
1378                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1379                 ban_state->count = 0;
1380         }
1381
1382
1383         if (ctdb->tunable.verify_recovery_lock != 0) {
1384                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1385                 start_time = timeval_current();
1386                 if (!ctdb_recovery_lock(ctdb, true)) {
1387                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1388                                          "and ban ourself for %u seconds\n",
1389                                          ctdb->tunable.recovery_ban_period));
1390                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1391                         return -1;
1392                 }
1393                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1394                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1395         }
1396
1397         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1398
1399         /* get a list of all databases */
1400         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1401         if (ret != 0) {
1402                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1403                 return -1;
1404         }
1405
1406         /* we do the db creation before we set the recovery mode, so the freeze happens
1407            on all databases we will be dealing with. */
1408
1409         /* verify that we have all the databases any other node has */
1410         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1411         if (ret != 0) {
1412                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1413                 return -1;
1414         }
1415
1416         /* verify that all other nodes have all our databases */
1417         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1418         if (ret != 0) {
1419                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1420                 return -1;
1421         }
1422         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1423
1424         /* update the database priority for all remote databases */
1425         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1426         if (ret != 0) {
1427                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1428         }
1429         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1430
1431
1432         /* update all other nodes to use the same setting for reclock files
1433            as the local recovery master.
1434         */
1435         sync_recovery_lock_file_across_cluster(rec);
1436
1437         /* set recovery mode to active on all nodes */
1438         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1439         if (ret != 0) {
1440                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1441                 return -1;
1442         }
1443
1444         /* execute the "startrecovery" event script on all nodes */
1445         ret = run_startrecovery_eventscript(rec, nodemap);
1446         if (ret!=0) {
1447                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1448                 return -1;
1449         }
1450
1451         /*
1452           update all nodes to have the same flags that we have
1453          */
1454         for (i=0;i<nodemap->num;i++) {
1455                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1456                         continue;
1457                 }
1458
1459                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1460                 if (ret != 0) {
1461                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1462                         return -1;
1463                 }
1464         }
1465
1466         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1467
1468         /* pick a new generation number */
1469         generation = new_generation();
1470
1471         /* change the vnnmap on this node to use the new generation 
1472            number but not on any other nodes.
1473            this guarantees that if we abort the recovery prematurely
1474            for some reason (a node stops responding?)
1475            that we can just return immediately and we will reenter
1476            recovery shortly again.
1477            I.e. we deliberately leave the cluster with an inconsistent
1478            generation id to allow us to abort recovery at any stage and
1479            just restart it from scratch.
1480          */
1481         vnnmap->generation = generation;
1482         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1483         if (ret != 0) {
1484                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1485                 return -1;
1486         }
1487
1488         data.dptr = (void *)&generation;
1489         data.dsize = sizeof(uint32_t);
1490
1491         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1492         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1493                                         nodes, 0,
1494                                         CONTROL_TIMEOUT(), false, data,
1495                                         NULL,
1496                                         transaction_start_fail_callback,
1497                                         rec) != 0) {
1498                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1499                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1500                                         nodes, 0,
1501                                         CONTROL_TIMEOUT(), false, tdb_null,
1502                                         NULL,
1503                                         NULL,
1504                                         NULL) != 0) {
1505                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1506                 }
1507                 return -1;
1508         }
1509
1510         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1511
1512         for (i=0;i<dbmap->num;i++) {
1513                 ret = recover_database(rec, mem_ctx,
1514                                        dbmap->dbs[i].dbid,
1515                                        dbmap->dbs[i].persistent,
1516                                        pnn, nodemap, generation);
1517                 if (ret != 0) {
1518                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1519                         return -1;
1520                 }
1521         }
1522
1523         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1524
1525         /* commit all the changes */
1526         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1527                                         nodes, 0,
1528                                         CONTROL_TIMEOUT(), false, data,
1529                                         NULL, NULL,
1530                                         NULL) != 0) {
1531                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1532                 return -1;
1533         }
1534
1535         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1536         
1537
1538         /* update the capabilities for all nodes */
1539         ret = update_capabilities(ctdb, nodemap);
1540         if (ret!=0) {
1541                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1542                 return -1;
1543         }
1544
1545         /* build a new vnn map with all the currently active and
1546            unbanned nodes */
1547         generation = new_generation();
1548         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1549         CTDB_NO_MEMORY(ctdb, vnnmap);
1550         vnnmap->generation = generation;
1551         vnnmap->size = 0;
1552         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1553         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1554         for (i=j=0;i<nodemap->num;i++) {
1555                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1556                         continue;
1557                 }
1558                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1559                         /* this node can not be an lmaster */
1560                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1561                         continue;
1562                 }
1563
1564                 vnnmap->size++;
1565                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1566                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1567                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1568
1569         }
1570         if (vnnmap->size == 0) {
1571                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1572                 vnnmap->size++;
1573                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1574                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1575                 vnnmap->map[0] = pnn;
1576         }       
1577
1578         /* update to the new vnnmap on all nodes */
1579         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1580         if (ret != 0) {
1581                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1582                 return -1;
1583         }
1584
1585         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1586
1587         /* update recmaster to point to us for all nodes */
1588         ret = set_recovery_master(ctdb, nodemap, pnn);
1589         if (ret!=0) {
1590                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1591                 return -1;
1592         }
1593
1594         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1595
1596         /*
1597           update all nodes to have the same flags that we have
1598          */
1599         for (i=0;i<nodemap->num;i++) {
1600                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1601                         continue;
1602                 }
1603
1604                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1605                 if (ret != 0) {
1606                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1607                         return -1;
1608                 }
1609         }
1610
1611         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1612
1613         /* disable recovery mode */
1614         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1615         if (ret != 0) {
1616                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1617                 return -1;
1618         }
1619
1620         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1621
1622         /*
1623           tell nodes to takeover their public IPs
1624          */
1625         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1626         if (ret != 0) {
1627                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1628                                  culprit));
1629                 return -1;
1630         }
1631         rec->need_takeover_run = false;
1632         ret = ctdb_takeover_run(ctdb, nodemap);
1633         if (ret != 0) {
1634                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1635                 return -1;
1636         }
1637         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1638
1639         /* execute the "recovered" event script on all nodes */
1640         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1641         if (ret!=0) {
1642                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1643                 return -1;
1644         }
1645
1646         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1647
1648         /* send a message to all clients telling them that the cluster 
1649            has been reconfigured */
1650         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1651
1652         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1653
1654         rec->need_recovery = false;
1655
1656         /* we managed to complete a full recovery, make sure to forgive
1657            any past sins by the nodes that could now participate in the
1658            recovery.
1659         */
1660         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1661         for (i=0;i<nodemap->num;i++) {
1662                 struct ctdb_banning_state *ban_state;
1663
1664                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1665                         continue;
1666                 }
1667
1668                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1669                 if (ban_state == NULL) {
1670                         continue;
1671                 }
1672
1673                 ban_state->count = 0;
1674         }
1675
1676
1677         /* We just finished a recovery successfully. 
1678            We now wait for rerecovery_timeout before we allow 
1679            another recovery to take place.
1680         */
1681         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1682         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1683         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1684
1685         return 0;
1686 }
1687
1688
1689 /*
1690   elections are won by first checking the number of connected nodes, then
1691   the priority time, then the pnn
1692  */
1693 struct election_message {
1694         uint32_t num_connected;
1695         struct timeval priority_time;
1696         uint32_t pnn;
1697         uint32_t node_flags;
1698 };
1699
1700 /*
1701   form this nodes election data
1702  */
1703 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1704 {
1705         int ret, i;
1706         struct ctdb_node_map *nodemap;
1707         struct ctdb_context *ctdb = rec->ctdb;
1708
1709         ZERO_STRUCTP(em);
1710
1711         em->pnn = rec->ctdb->pnn;
1712         em->priority_time = rec->priority_time;
1713
1714         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1715         if (ret != 0) {
1716                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1717                 return;
1718         }
1719
1720         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1721         em->node_flags = rec->node_flags;
1722
1723         for (i=0;i<nodemap->num;i++) {
1724                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1725                         em->num_connected++;
1726                 }
1727         }
1728
1729         /* we shouldnt try to win this election if we cant be a recmaster */
1730         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1731                 em->num_connected = 0;
1732                 em->priority_time = timeval_current();
1733         }
1734
1735         talloc_free(nodemap);
1736 }
1737
1738 /*
1739   see if the given election data wins
1740  */
1741 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1742 {
1743         struct election_message myem;
1744         int cmp = 0;
1745
1746         ctdb_election_data(rec, &myem);
1747
1748         /* we cant win if we dont have the recmaster capability */
1749         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1750                 return false;
1751         }
1752
1753         /* we cant win if we are banned */
1754         if (rec->node_flags & NODE_FLAGS_BANNED) {
1755                 return false;
1756         }       
1757
1758         /* we cant win if we are stopped */
1759         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1760                 return false;
1761         }       
1762
1763         /* we will automatically win if the other node is banned */
1764         if (em->node_flags & NODE_FLAGS_BANNED) {
1765                 return true;
1766         }
1767
1768         /* we will automatically win if the other node is banned */
1769         if (em->node_flags & NODE_FLAGS_STOPPED) {
1770                 return true;
1771         }
1772
1773         /* try to use the most connected node */
1774         if (cmp == 0) {
1775                 cmp = (int)myem.num_connected - (int)em->num_connected;
1776         }
1777
1778         /* then the longest running node */
1779         if (cmp == 0) {
1780                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1781         }
1782
1783         if (cmp == 0) {
1784                 cmp = (int)myem.pnn - (int)em->pnn;
1785         }
1786
1787         return cmp > 0;
1788 }
1789
1790 /*
1791   send out an election request
1792  */
1793 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1794 {
1795         int ret;
1796         TDB_DATA election_data;
1797         struct election_message emsg;
1798         uint64_t srvid;
1799         struct ctdb_context *ctdb = rec->ctdb;
1800
1801         srvid = CTDB_SRVID_RECOVERY;
1802
1803         ctdb_election_data(rec, &emsg);
1804
1805         election_data.dsize = sizeof(struct election_message);
1806         election_data.dptr  = (unsigned char *)&emsg;
1807
1808
1809         /* send an election message to all active nodes */
1810         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1811         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1812
1813
1814         /* A new node that is already frozen has entered the cluster.
1815            The existing nodes are not frozen and dont need to be frozen
1816            until the election has ended and we start the actual recovery
1817         */
1818         if (update_recmaster == true) {
1819                 /* first we assume we will win the election and set 
1820                    recoverymaster to be ourself on the current node
1821                  */
1822                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1823                 if (ret != 0) {
1824                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1825                         return -1;
1826                 }
1827         }
1828
1829
1830         return 0;
1831 }
1832
1833 /*
1834   this function will unban all nodes in the cluster
1835 */
1836 static void unban_all_nodes(struct ctdb_context *ctdb)
1837 {
1838         int ret, i;
1839         struct ctdb_node_map *nodemap;
1840         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1841         
1842         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1845                 return;
1846         }
1847
1848         for (i=0;i<nodemap->num;i++) {
1849                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1850                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1851                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1852                 }
1853         }
1854
1855         talloc_free(tmp_ctx);
1856 }
1857
1858
1859 /*
1860   we think we are winning the election - send a broadcast election request
1861  */
1862 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1863 {
1864         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1865         int ret;
1866
1867         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1868         if (ret != 0) {
1869                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1870         }
1871
1872         talloc_free(rec->send_election_te);
1873         rec->send_election_te = NULL;
1874 }
1875
1876 /*
1877   handler for memory dumps
1878 */
1879 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1880                              TDB_DATA data, void *private_data)
1881 {
1882         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1883         TDB_DATA *dump;
1884         int ret;
1885         struct rd_memdump_reply *rd;
1886
1887         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1888                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1889                 talloc_free(tmp_ctx);
1890                 return;
1891         }
1892         rd = (struct rd_memdump_reply *)data.dptr;
1893
1894         dump = talloc_zero(tmp_ctx, TDB_DATA);
1895         if (dump == NULL) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1897                 talloc_free(tmp_ctx);
1898                 return;
1899         }
1900         ret = ctdb_dump_memory(ctdb, dump);
1901         if (ret != 0) {
1902                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1903                 talloc_free(tmp_ctx);
1904                 return;
1905         }
1906
1907 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1908
1909         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1910         if (ret != 0) {
1911                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1912                 talloc_free(tmp_ctx);
1913                 return;
1914         }
1915
1916         talloc_free(tmp_ctx);
1917 }
1918
1919 /*
1920   handler for reload_nodes
1921 */
1922 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1923                              TDB_DATA data, void *private_data)
1924 {
1925         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1926
1927         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1928
1929         reload_nodes_file(rec->ctdb);
1930 }
1931
1932
1933 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1934                               struct timeval yt, void *p)
1935 {
1936         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1937
1938         talloc_free(rec->ip_check_disable_ctx);
1939         rec->ip_check_disable_ctx = NULL;
1940 }
1941
1942
1943 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1944                              TDB_DATA data, void *private_data)
1945 {
1946         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1947         struct ctdb_public_ip *ip;
1948
1949         if (rec->recmaster != rec->ctdb->pnn) {
1950                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1951                 return;
1952         }
1953
1954         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1955                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1956                 return;
1957         }
1958
1959         ip = (struct ctdb_public_ip *)data.dptr;
1960
1961         update_ip_assignment_tree(rec->ctdb, ip);
1962 }
1963
1964
1965 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1966                              TDB_DATA data, void *private_data)
1967 {
1968         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1969         uint32_t timeout;
1970
1971         if (rec->ip_check_disable_ctx != NULL) {
1972                 talloc_free(rec->ip_check_disable_ctx);
1973                 rec->ip_check_disable_ctx = NULL;
1974         }
1975
1976         if (data.dsize != sizeof(uint32_t)) {
1977                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1978                                  "expexting %lu\n", (long unsigned)data.dsize,
1979                                  (long unsigned)sizeof(uint32_t)));
1980                 return;
1981         }
1982         if (data.dptr == NULL) {
1983                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1984                 return;
1985         }
1986
1987         timeout = *((uint32_t *)data.dptr);
1988         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1989
1990         rec->ip_check_disable_ctx = talloc_new(rec);
1991         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1992
1993         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1994 }
1995
1996
1997 /*
1998   handler for ip reallocate, just add it to the list of callers and 
1999   handle this later in the monitor_cluster loop so we do not recurse
2000   with other callers to takeover_run()
2001 */
2002 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2003                              TDB_DATA data, void *private_data)
2004 {
2005         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2006         struct ip_reallocate_list *caller;
2007
2008         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2009                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2010                 return;
2011         }
2012
2013         if (rec->ip_reallocate_ctx == NULL) {
2014                 rec->ip_reallocate_ctx = talloc_new(rec);
2015                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2016         }
2017
2018         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2019         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2020
2021         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2022         caller->next = rec->reallocate_callers;
2023         rec->reallocate_callers = caller;
2024
2025         return;
2026 }
2027
2028 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2029 {
2030         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2031         TDB_DATA result;
2032         int32_t ret;
2033         struct ip_reallocate_list *callers;
2034         uint32_t culprit;
2035
2036         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2037
2038         /* update the list of public ips that a node can handle for
2039            all connected nodes
2040         */
2041         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2042         if (ret != 0) {
2043                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2044                                  culprit));
2045                 rec->need_takeover_run = true;
2046         }
2047         if (ret == 0) {
2048                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2049                 if (ret != 0) {
2050                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2051                                          culprit));
2052                         rec->need_takeover_run = true;
2053                 }
2054         }
2055
2056         result.dsize = sizeof(int32_t);
2057         result.dptr  = (uint8_t *)&ret;
2058
2059         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2060
2061                 /* Someone that sent srvid==0 does not want a reply */
2062                 if (callers->rd->srvid == 0) {
2063                         continue;
2064                 }
2065                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2066                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2067                                   (unsigned long long)callers->rd->srvid));
2068                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2069                 if (ret != 0) {
2070                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2071                                          "message to %u:%llu\n",
2072                                          (unsigned)callers->rd->pnn,
2073                                          (unsigned long long)callers->rd->srvid));
2074                 }
2075         }
2076
2077         talloc_free(tmp_ctx);
2078         talloc_free(rec->ip_reallocate_ctx);
2079         rec->ip_reallocate_ctx = NULL;
2080         rec->reallocate_callers = NULL;
2081         
2082 }
2083
2084
2085 /*
2086   handler for recovery master elections
2087 */
2088 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2089                              TDB_DATA data, void *private_data)
2090 {
2091         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2092         int ret;
2093         struct election_message *em = (struct election_message *)data.dptr;
2094         TALLOC_CTX *mem_ctx;
2095
2096         /* we got an election packet - update the timeout for the election */
2097         talloc_free(rec->election_timeout);
2098         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2099                                                 fast_start ?
2100                                                 timeval_current_ofs(0, 500000) :
2101                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2102                                                 ctdb_election_timeout, rec);
2103
2104         mem_ctx = talloc_new(ctdb);
2105
2106         /* someone called an election. check their election data
2107            and if we disagree and we would rather be the elected node, 
2108            send a new election message to all other nodes
2109          */
2110         if (ctdb_election_win(rec, em)) {
2111                 if (!rec->send_election_te) {
2112                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2113                                                                 timeval_current_ofs(0, 500000),
2114                                                                 election_send_request, rec);
2115                 }
2116                 talloc_free(mem_ctx);
2117                 /*unban_all_nodes(ctdb);*/
2118                 return;
2119         }
2120         
2121         /* we didn't win */
2122         talloc_free(rec->send_election_te);
2123         rec->send_election_te = NULL;
2124
2125         if (ctdb->tunable.verify_recovery_lock != 0) {
2126                 /* release the recmaster lock */
2127                 if (em->pnn != ctdb->pnn &&
2128                     ctdb->recovery_lock_fd != -1) {
2129                         close(ctdb->recovery_lock_fd);
2130                         ctdb->recovery_lock_fd = -1;
2131                         unban_all_nodes(ctdb);
2132                 }
2133         }
2134
2135         /* ok, let that guy become recmaster then */
2136         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2139                 talloc_free(mem_ctx);
2140                 return;
2141         }
2142
2143         talloc_free(mem_ctx);
2144         return;
2145 }
2146
2147
2148 /*
2149   force the start of the election process
2150  */
2151 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2152                            struct ctdb_node_map *nodemap)
2153 {
2154         int ret;
2155         struct ctdb_context *ctdb = rec->ctdb;
2156
2157         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2158
2159         /* set all nodes to recovery mode to stop all internode traffic */
2160         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2161         if (ret != 0) {
2162                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2163                 return;
2164         }
2165
2166         talloc_free(rec->election_timeout);
2167         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2168                                                 fast_start ?
2169                                                 timeval_current_ofs(0, 500000) :
2170                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2171                                                 ctdb_election_timeout, rec);
2172
2173         ret = send_election_request(rec, pnn, true);
2174         if (ret!=0) {
2175                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2176                 return;
2177         }
2178
2179         /* wait for a few seconds to collect all responses */
2180         ctdb_wait_election(rec);
2181 }
2182
2183
2184
2185 /*
2186   handler for when a node changes its flags
2187 */
2188 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2189                             TDB_DATA data, void *private_data)
2190 {
2191         int ret;
2192         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2193         struct ctdb_node_map *nodemap=NULL;
2194         TALLOC_CTX *tmp_ctx;
2195         uint32_t changed_flags;
2196         int i;
2197         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2198         int disabled_flag_changed;
2199
2200         if (data.dsize != sizeof(*c)) {
2201                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2202                 return;
2203         }
2204
2205         tmp_ctx = talloc_new(ctdb);
2206         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2207
2208         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2209         if (ret != 0) {
2210                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2211                 talloc_free(tmp_ctx);
2212                 return;         
2213         }
2214
2215
2216         for (i=0;i<nodemap->num;i++) {
2217                 if (nodemap->nodes[i].pnn == c->pnn) break;
2218         }
2219
2220         if (i == nodemap->num) {
2221                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2222                 talloc_free(tmp_ctx);
2223                 return;
2224         }
2225
2226         changed_flags = c->old_flags ^ c->new_flags;
2227
2228         if (nodemap->nodes[i].flags != c->new_flags) {
2229                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2230         }
2231
2232         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2233
2234         nodemap->nodes[i].flags = c->new_flags;
2235
2236         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2237                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2238
2239         if (ret == 0) {
2240                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2241                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2242         }
2243         
2244         if (ret == 0 &&
2245             ctdb->recovery_master == ctdb->pnn &&
2246             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2247                 /* Only do the takeover run if the perm disabled or unhealthy
2248                    flags changed since these will cause an ip failover but not
2249                    a recovery.
2250                    If the node became disconnected or banned this will also
2251                    lead to an ip address failover but that is handled 
2252                    during recovery
2253                 */
2254                 if (disabled_flag_changed) {
2255                         rec->need_takeover_run = true;
2256                 }
2257         }
2258
2259         talloc_free(tmp_ctx);
2260 }
2261
2262 /*
2263   handler for when we need to push out flag changes ot all other nodes
2264 */
2265 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2266                             TDB_DATA data, void *private_data)
2267 {
2268         int ret;
2269         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2270         struct ctdb_node_map *nodemap=NULL;
2271         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2272         uint32_t recmaster;
2273         uint32_t *nodes;
2274
2275         /* find the recovery master */
2276         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2277         if (ret != 0) {
2278                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2279                 talloc_free(tmp_ctx);
2280                 return;
2281         }
2282
2283         /* read the node flags from the recmaster */
2284         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2285         if (ret != 0) {
2286                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2287                 talloc_free(tmp_ctx);
2288                 return;
2289         }
2290         if (c->pnn >= nodemap->num) {
2291                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2292                 talloc_free(tmp_ctx);
2293                 return;
2294         }
2295
2296         /* send the flags update to all connected nodes */
2297         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2298
2299         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2300                                       nodes, 0, CONTROL_TIMEOUT(),
2301                                       false, data,
2302                                       NULL, NULL,
2303                                       NULL) != 0) {
2304                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2305
2306                 talloc_free(tmp_ctx);
2307                 return;
2308         }
2309
2310         talloc_free(tmp_ctx);
2311 }
2312
2313
2314 struct verify_recmode_normal_data {
2315         uint32_t count;
2316         enum monitor_result status;
2317 };
2318
2319 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2320 {
2321         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2322
2323
2324         /* one more node has responded with recmode data*/
2325         rmdata->count--;
2326
2327         /* if we failed to get the recmode, then return an error and let
2328            the main loop try again.
2329         */
2330         if (state->state != CTDB_CONTROL_DONE) {
2331                 if (rmdata->status == MONITOR_OK) {
2332                         rmdata->status = MONITOR_FAILED;
2333                 }
2334                 return;
2335         }
2336
2337         /* if we got a response, then the recmode will be stored in the
2338            status field
2339         */
2340         if (state->status != CTDB_RECOVERY_NORMAL) {
2341                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2342                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2343         }
2344
2345         return;
2346 }
2347
2348
2349 /* verify that all nodes are in normal recovery mode */
2350 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2351 {
2352         struct verify_recmode_normal_data *rmdata;
2353         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2354         struct ctdb_client_control_state *state;
2355         enum monitor_result status;
2356         int j;
2357         
2358         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2359         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2360         rmdata->count  = 0;
2361         rmdata->status = MONITOR_OK;
2362
2363         /* loop over all active nodes and send an async getrecmode call to 
2364            them*/
2365         for (j=0; j<nodemap->num; j++) {
2366                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2367                         continue;
2368                 }
2369                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2370                                         CONTROL_TIMEOUT(), 
2371                                         nodemap->nodes[j].pnn);
2372                 if (state == NULL) {
2373                         /* we failed to send the control, treat this as 
2374                            an error and try again next iteration
2375                         */                      
2376                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2377                         talloc_free(mem_ctx);
2378                         return MONITOR_FAILED;
2379                 }
2380
2381                 /* set up the callback functions */
2382                 state->async.fn = verify_recmode_normal_callback;
2383                 state->async.private_data = rmdata;
2384
2385                 /* one more control to wait for to complete */
2386                 rmdata->count++;
2387         }
2388
2389
2390         /* now wait for up to the maximum number of seconds allowed
2391            or until all nodes we expect a response from has replied
2392         */
2393         while (rmdata->count > 0) {
2394                 event_loop_once(ctdb->ev);
2395         }
2396
2397         status = rmdata->status;
2398         talloc_free(mem_ctx);
2399         return status;
2400 }
2401
2402
2403 struct verify_recmaster_data {
2404         struct ctdb_recoverd *rec;
2405         uint32_t count;
2406         uint32_t pnn;
2407         enum monitor_result status;
2408 };
2409
2410 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2411 {
2412         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2413
2414
2415         /* one more node has responded with recmaster data*/
2416         rmdata->count--;
2417
2418         /* if we failed to get the recmaster, then return an error and let
2419            the main loop try again.
2420         */
2421         if (state->state != CTDB_CONTROL_DONE) {
2422                 if (rmdata->status == MONITOR_OK) {
2423                         rmdata->status = MONITOR_FAILED;
2424                 }
2425                 return;
2426         }
2427
2428         /* if we got a response, then the recmaster will be stored in the
2429            status field
2430         */
2431         if (state->status != rmdata->pnn) {
2432                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2433                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2434                 rmdata->status = MONITOR_ELECTION_NEEDED;
2435         }
2436
2437         return;
2438 }
2439
2440
2441 /* verify that all nodes agree that we are the recmaster */
2442 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2443 {
2444         struct ctdb_context *ctdb = rec->ctdb;
2445         struct verify_recmaster_data *rmdata;
2446         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2447         struct ctdb_client_control_state *state;
2448         enum monitor_result status;
2449         int j;
2450         
2451         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2452         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2453         rmdata->rec    = rec;
2454         rmdata->count  = 0;
2455         rmdata->pnn    = pnn;
2456         rmdata->status = MONITOR_OK;
2457
2458         /* loop over all active nodes and send an async getrecmaster call to 
2459            them*/
2460         for (j=0; j<nodemap->num; j++) {
2461                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2462                         continue;
2463                 }
2464                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2465                                         CONTROL_TIMEOUT(),
2466                                         nodemap->nodes[j].pnn);
2467                 if (state == NULL) {
2468                         /* we failed to send the control, treat this as 
2469                            an error and try again next iteration
2470                         */                      
2471                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2472                         talloc_free(mem_ctx);
2473                         return MONITOR_FAILED;
2474                 }
2475
2476                 /* set up the callback functions */
2477                 state->async.fn = verify_recmaster_callback;
2478                 state->async.private_data = rmdata;
2479
2480                 /* one more control to wait for to complete */
2481                 rmdata->count++;
2482         }
2483
2484
2485         /* now wait for up to the maximum number of seconds allowed
2486            or until all nodes we expect a response from has replied
2487         */
2488         while (rmdata->count > 0) {
2489                 event_loop_once(ctdb->ev);
2490         }
2491
2492         status = rmdata->status;
2493         talloc_free(mem_ctx);
2494         return status;
2495 }
2496
2497
2498 /* called to check that the local allocation of public ip addresses is ok.
2499 */
2500 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2501 {
2502         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2503         struct ctdb_control_get_ifaces *ifaces = NULL;
2504         struct ctdb_all_public_ips *ips = NULL;
2505         struct ctdb_uptime *uptime1 = NULL;
2506         struct ctdb_uptime *uptime2 = NULL;
2507         int ret, j;
2508         bool need_iface_check = false;
2509         bool need_takeover_run = false;
2510
2511         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2512                                 CTDB_CURRENT_NODE, &uptime1);
2513         if (ret != 0) {
2514                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2515                 talloc_free(mem_ctx);
2516                 return -1;
2517         }
2518
2519
2520         /* read the interfaces from the local node */
2521         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2522         if (ret != 0) {
2523                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2524                 talloc_free(mem_ctx);
2525                 return -1;
2526         }
2527
2528         if (!rec->ifaces) {
2529                 need_iface_check = true;
2530         } else if (rec->ifaces->num != ifaces->num) {
2531                 need_iface_check = true;
2532         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2533                 need_iface_check = true;
2534         }
2535
2536         if (need_iface_check) {
2537                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2538                                      "local node %u - force takeover run\n",
2539                                      pnn));
2540                 need_takeover_run = true;
2541         }
2542
2543         /* read the ip allocation from the local node */
2544         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2545         if (ret != 0) {
2546                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2547                 talloc_free(mem_ctx);
2548                 return -1;
2549         }
2550
2551         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2552                                 CTDB_CURRENT_NODE, &uptime2);
2553         if (ret != 0) {
2554                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2555                 talloc_free(mem_ctx);
2556                 return -1;
2557         }
2558
2559         /* skip the check if the startrecovery time has changed */
2560         if (timeval_compare(&uptime1->last_recovery_started,
2561                             &uptime2->last_recovery_started) != 0) {
2562                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2563                 talloc_free(mem_ctx);
2564                 return 0;
2565         }
2566
2567         /* skip the check if the endrecovery time has changed */
2568         if (timeval_compare(&uptime1->last_recovery_finished,
2569                             &uptime2->last_recovery_finished) != 0) {
2570                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2571                 talloc_free(mem_ctx);
2572                 return 0;
2573         }
2574
2575         /* skip the check if we have started but not finished recovery */
2576         if (timeval_compare(&uptime1->last_recovery_finished,
2577                             &uptime1->last_recovery_started) != 1) {
2578                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2579                 talloc_free(mem_ctx);
2580
2581                 return 0;
2582         }
2583
2584         talloc_free(rec->ifaces);
2585         rec->ifaces = talloc_steal(rec, ifaces);
2586
2587         /* verify that we have the ip addresses we should have
2588            and we dont have ones we shouldnt have.
2589            if we find an inconsistency we set recmode to
2590            active on the local node and wait for the recmaster
2591            to do a full blown recovery.
2592            also if the pnn is -1 and we are healthy and can host the ip
2593            we also request a ip reallocation.
2594         */
2595         if (ctdb->tunable.disable_ip_failover == 0) {
2596                 for (j=0; j<ips->num; j++) {
2597                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2598                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2599                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2600                                 need_takeover_run = true;
2601                         } else if (ips->ips[j].pnn == pnn) {
2602                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2603                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2604                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2605                                         need_takeover_run = true;
2606                                 }
2607                         } else {
2608                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2609                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2610                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2611                                         need_takeover_run = true;
2612                                 }
2613                         }
2614                 }
2615         }
2616
2617         if (need_takeover_run) {
2618                 struct takeover_run_reply rd;
2619                 TDB_DATA data;
2620
2621                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2622
2623                 rd.pnn = ctdb->pnn;
2624                 rd.srvid = 0;
2625                 data.dptr = (uint8_t *)&rd;
2626                 data.dsize = sizeof(rd);
2627
2628                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2629                 if (ret != 0) {
2630                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2631                 }
2632         }
2633         talloc_free(mem_ctx);
2634         return 0;
2635 }
2636
2637
2638 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2639 {
2640         struct ctdb_node_map **remote_nodemaps = callback_data;
2641
2642         if (node_pnn >= ctdb->num_nodes) {
2643                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2644                 return;
2645         }
2646
2647         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2648
2649 }
2650
2651 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2652         struct ctdb_node_map *nodemap,
2653         struct ctdb_node_map **remote_nodemaps)
2654 {
2655         uint32_t *nodes;
2656
2657         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2658         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2659                                         nodes, 0,
2660                                         CONTROL_TIMEOUT(), false, tdb_null,
2661                                         async_getnodemap_callback,
2662                                         NULL,
2663                                         remote_nodemaps) != 0) {
2664                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2665
2666                 return -1;
2667         }
2668
2669         return 0;
2670 }
2671
2672 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2673 struct ctdb_check_reclock_state {
2674         struct ctdb_context *ctdb;
2675         struct timeval start_time;
2676         int fd[2];
2677         pid_t child;
2678         struct timed_event *te;
2679         struct fd_event *fde;
2680         enum reclock_child_status status;
2681 };
2682
2683 /* when we free the reclock state we must kill any child process.
2684 */
2685 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2686 {
2687         struct ctdb_context *ctdb = state->ctdb;
2688
2689         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2690
2691         if (state->fd[0] != -1) {
2692                 close(state->fd[0]);
2693                 state->fd[0] = -1;
2694         }
2695         if (state->fd[1] != -1) {
2696                 close(state->fd[1]);
2697                 state->fd[1] = -1;
2698         }
2699         kill(state->child, SIGKILL);
2700         return 0;
2701 }
2702
2703 /*
2704   called if our check_reclock child times out. this would happen if
2705   i/o to the reclock file blocks.
2706  */
2707 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2708                                          struct timeval t, void *private_data)
2709 {
2710         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2711                                            struct ctdb_check_reclock_state);
2712
2713         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2714         state->status = RECLOCK_TIMEOUT;
2715 }
2716
2717 /* this is called when the child process has completed checking the reclock
2718    file and has written data back to us through the pipe.
2719 */
2720 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2721                              uint16_t flags, void *private_data)
2722 {
2723         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2724                                              struct ctdb_check_reclock_state);
2725         char c = 0;
2726         int ret;
2727
2728         /* we got a response from our child process so we can abort the
2729            timeout.
2730         */
2731         talloc_free(state->te);
2732         state->te = NULL;
2733
2734         ret = read(state->fd[0], &c, 1);
2735         if (ret != 1 || c != RECLOCK_OK) {
2736                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2737                 state->status = RECLOCK_FAILED;
2738
2739                 return;
2740         }
2741
2742         state->status = RECLOCK_OK;
2743         return;
2744 }
2745
2746 static int check_recovery_lock(struct ctdb_context *ctdb)
2747 {
2748         int ret;
2749         struct ctdb_check_reclock_state *state;
2750         pid_t parent = getpid();
2751
2752         if (ctdb->recovery_lock_fd == -1) {
2753                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2754                 return -1;
2755         }
2756
2757         state = talloc(ctdb, struct ctdb_check_reclock_state);
2758         CTDB_NO_MEMORY(ctdb, state);
2759
2760         state->ctdb = ctdb;
2761         state->start_time = timeval_current();
2762         state->status = RECLOCK_CHECKING;
2763         state->fd[0] = -1;
2764         state->fd[1] = -1;
2765
2766         ret = pipe(state->fd);
2767         if (ret != 0) {
2768                 talloc_free(state);
2769                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2770                 return -1;
2771         }
2772
2773         state->child = fork();
2774         if (state->child == (pid_t)-1) {
2775                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2776                 close(state->fd[0]);
2777                 state->fd[0] = -1;
2778                 close(state->fd[1]);
2779                 state->fd[1] = -1;
2780                 talloc_free(state);
2781                 return -1;
2782         }
2783
2784         if (state->child == 0) {
2785                 char cc = RECLOCK_OK;
2786                 close(state->fd[0]);
2787                 state->fd[0] = -1;
2788
2789                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2790                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2791                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2792                         cc = RECLOCK_FAILED;
2793                 }
2794
2795                 write(state->fd[1], &cc, 1);
2796                 /* make sure we die when our parent dies */
2797                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2798                         sleep(5);
2799                         write(state->fd[1], &cc, 1);
2800                 }
2801                 _exit(0);
2802         }
2803         close(state->fd[1]);
2804         state->fd[1] = -1;
2805         set_close_on_exec(state->fd[0]);
2806
2807         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2808
2809         talloc_set_destructor(state, check_reclock_destructor);
2810
2811         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2812                                     ctdb_check_reclock_timeout, state);
2813         if (state->te == NULL) {
2814                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2815                 talloc_free(state);
2816                 return -1;
2817         }
2818
2819         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2820                                 EVENT_FD_READ,
2821                                 reclock_child_handler,
2822                                 (void *)state);
2823
2824         if (state->fde == NULL) {
2825                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2826                 talloc_free(state);
2827                 return -1;
2828         }
2829         tevent_fd_set_auto_close(state->fde);
2830
2831         while (state->status == RECLOCK_CHECKING) {
2832                 event_loop_once(ctdb->ev);
2833         }
2834
2835         if (state->status == RECLOCK_FAILED) {
2836                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2837                 close(ctdb->recovery_lock_fd);
2838                 ctdb->recovery_lock_fd = -1;
2839                 talloc_free(state);
2840                 return -1;
2841         }
2842
2843         talloc_free(state);
2844         return 0;
2845 }
2846
2847 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2848 {
2849         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2850         const char *reclockfile;
2851
2852         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2853                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2854                 talloc_free(tmp_ctx);
2855                 return -1;      
2856         }
2857
2858         if (reclockfile == NULL) {
2859                 if (ctdb->recovery_lock_file != NULL) {
2860                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2861                         talloc_free(ctdb->recovery_lock_file);
2862                         ctdb->recovery_lock_file = NULL;
2863                         if (ctdb->recovery_lock_fd != -1) {
2864                                 close(ctdb->recovery_lock_fd);
2865                                 ctdb->recovery_lock_fd = -1;
2866                         }
2867                 }
2868                 ctdb->tunable.verify_recovery_lock = 0;
2869                 talloc_free(tmp_ctx);
2870                 return 0;
2871         }
2872
2873         if (ctdb->recovery_lock_file == NULL) {
2874                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2875                 if (ctdb->recovery_lock_fd != -1) {
2876                         close(ctdb->recovery_lock_fd);
2877                         ctdb->recovery_lock_fd = -1;
2878                 }
2879                 talloc_free(tmp_ctx);
2880                 return 0;
2881         }
2882
2883
2884         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2885                 talloc_free(tmp_ctx);
2886                 return 0;
2887         }
2888
2889         talloc_free(ctdb->recovery_lock_file);
2890         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2891         ctdb->tunable.verify_recovery_lock = 0;
2892         if (ctdb->recovery_lock_fd != -1) {
2893                 close(ctdb->recovery_lock_fd);
2894                 ctdb->recovery_lock_fd = -1;
2895         }
2896
2897         talloc_free(tmp_ctx);
2898         return 0;
2899 }
2900
2901 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2902                       TALLOC_CTX *mem_ctx)
2903 {
2904         uint32_t pnn;
2905         struct ctdb_node_map *nodemap=NULL;
2906         struct ctdb_node_map *recmaster_nodemap=NULL;
2907         struct ctdb_node_map **remote_nodemaps=NULL;
2908         struct ctdb_vnn_map *vnnmap=NULL;
2909         struct ctdb_vnn_map *remote_vnnmap=NULL;
2910         int32_t debug_level;
2911         int i, j, ret;
2912
2913
2914
2915         /* verify that the main daemon is still running */
2916         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2917                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2918                 exit(-1);
2919         }
2920
2921         /* ping the local daemon to tell it we are alive */
2922         ctdb_ctrl_recd_ping(ctdb);
2923
2924         if (rec->election_timeout) {
2925                 /* an election is in progress */
2926                 return;
2927         }
2928
2929         /* read the debug level from the parent and update locally */
2930         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2931         if (ret !=0) {
2932                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2933                 return;
2934         }
2935         LogLevel = debug_level;
2936
2937
2938         /* We must check if we need to ban a node here but we want to do this
2939            as early as possible so we dont wait until we have pulled the node
2940            map from the local node. thats why we have the hardcoded value 20
2941         */
2942         for (i=0; i<ctdb->num_nodes; i++) {
2943                 struct ctdb_banning_state *ban_state;
2944
2945                 if (ctdb->nodes[i]->ban_state == NULL) {
2946                         continue;
2947                 }
2948                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2949                 if (ban_state->count < 20) {
2950                         continue;
2951                 }
2952                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2953                         ctdb->nodes[i]->pnn, ban_state->count,
2954                         ctdb->tunable.recovery_ban_period));
2955                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2956                 ban_state->count = 0;
2957         }
2958
2959         /* get relevant tunables */
2960         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2961         if (ret != 0) {
2962                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2963                 return;
2964         }
2965
2966         /* get the current recovery lock file from the server */
2967         if (update_recovery_lock_file(ctdb) != 0) {
2968                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2969                 return;
2970         }
2971
2972         /* Make sure that if recovery lock verification becomes disabled when
2973            we close the file
2974         */
2975         if (ctdb->tunable.verify_recovery_lock == 0) {
2976                 if (ctdb->recovery_lock_fd != -1) {
2977                         close(ctdb->recovery_lock_fd);
2978                         ctdb->recovery_lock_fd = -1;
2979                 }
2980         }
2981
2982         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2983         if (pnn == (uint32_t)-1) {
2984                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2985                 return;
2986         }
2987
2988         /* get the vnnmap */
2989         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2990         if (ret != 0) {
2991                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2992                 return;
2993         }
2994
2995
2996         /* get number of nodes */
2997         if (rec->nodemap) {
2998                 talloc_free(rec->nodemap);
2999                 rec->nodemap = NULL;
3000                 nodemap=NULL;
3001         }
3002         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3003         if (ret != 0) {
3004                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3005                 return;
3006         }
3007         nodemap = rec->nodemap;
3008
3009         /* check which node is the recovery master */
3010         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3011         if (ret != 0) {
3012                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3013                 return;
3014         }
3015
3016         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3017         if (rec->recmaster != pnn) {
3018                 if (rec->ip_reallocate_ctx != NULL) {
3019                         talloc_free(rec->ip_reallocate_ctx);
3020                         rec->ip_reallocate_ctx = NULL;
3021                         rec->reallocate_callers = NULL;
3022                 }
3023         }
3024
3025         if (rec->recmaster == (uint32_t)-1) {
3026                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3027                 force_election(rec, pnn, nodemap);
3028                 return;
3029         }
3030
3031
3032         /* if the local daemon is STOPPED, we verify that the databases are
3033            also frozen and thet the recmode is set to active 
3034         */
3035         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3036                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3037                 if (ret != 0) {
3038                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3039                 }
3040                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3041                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3042
3043                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3044                         if (ret != 0) {
3045                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3046                                 return;
3047                         }
3048                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3049                         if (ret != 0) {
3050                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3051
3052                                 return;
3053                         }
3054                         return;
3055                 }
3056         }
3057         /* If the local node is stopped, verify we are not the recmaster 
3058            and yield this role if so
3059         */
3060         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3061                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3062                 force_election(rec, pnn, nodemap);
3063                 return;
3064         }
3065         
3066         /* check that we (recovery daemon) and the local ctdb daemon
3067            agrees on whether we are banned or not
3068         */
3069 //qqq
3070
3071         /* remember our own node flags */
3072         rec->node_flags = nodemap->nodes[pnn].flags;
3073
3074         /* count how many active nodes there are */
3075         rec->num_active    = 0;
3076         rec->num_connected = 0;
3077         for (i=0; i<nodemap->num; i++) {
3078                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3079                         rec->num_active++;
3080                 }
3081                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3082                         rec->num_connected++;
3083                 }
3084         }
3085
3086
3087         /* verify that the recmaster node is still active */
3088         for (j=0; j<nodemap->num; j++) {
3089                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3090                         break;
3091                 }
3092         }
3093
3094         if (j == nodemap->num) {
3095                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3096                 force_election(rec, pnn, nodemap);
3097                 return;
3098         }
3099
3100         /* if recovery master is disconnected we must elect a new recmaster */
3101         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3102                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3103                 force_election(rec, pnn, nodemap);
3104                 return;
3105         }
3106
3107         /* grap the nodemap from the recovery master to check if it is banned */
3108         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3109                                    mem_ctx, &recmaster_nodemap);
3110         if (ret != 0) {
3111                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3112                           nodemap->nodes[j].pnn));
3113                 return;
3114         }
3115
3116
3117         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3118                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3119                 force_election(rec, pnn, nodemap);
3120                 return;
3121         }
3122
3123
3124         /* verify that we have all ip addresses we should have and we dont
3125          * have addresses we shouldnt have.
3126          */ 
3127         if (ctdb->tunable.disable_ip_failover == 0) {
3128                 if (rec->ip_check_disable_ctx == NULL) {
3129                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3130                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3131                         }
3132                 }
3133         }
3134
3135
3136         /* if we are not the recmaster then we do not need to check
3137            if recovery is needed
3138          */
3139         if (pnn != rec->recmaster) {
3140                 return;
3141         }
3142
3143
3144         /* ensure our local copies of flags are right */
3145         ret = update_local_flags(rec, nodemap);
3146         if (ret == MONITOR_ELECTION_NEEDED) {
3147                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3148                 force_election(rec, pnn, nodemap);
3149                 return;
3150         }
3151         if (ret != MONITOR_OK) {
3152                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3153                 return;
3154         }
3155
3156         if (ctdb->num_nodes != nodemap->num) {
3157                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3158                 reload_nodes_file(ctdb);
3159                 return;
3160         }
3161
3162         /* verify that all active nodes agree that we are the recmaster */
3163         switch (verify_recmaster(rec, nodemap, pnn)) {
3164         case MONITOR_RECOVERY_NEEDED:
3165                 /* can not happen */
3166                 return;
3167         case MONITOR_ELECTION_NEEDED:
3168                 force_election(rec, pnn, nodemap);
3169                 return;
3170         case MONITOR_OK:
3171                 break;
3172         case MONITOR_FAILED:
3173                 return;
3174         }
3175
3176
3177         if (rec->need_recovery) {
3178                 /* a previous recovery didn't finish */
3179                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3180                 return;
3181         }
3182
3183         /* verify that all active nodes are in normal mode 
3184            and not in recovery mode 
3185         */
3186         switch (verify_recmode(ctdb, nodemap)) {
3187         case MONITOR_RECOVERY_NEEDED:
3188                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3189                 return;
3190         case MONITOR_FAILED:
3191                 return;
3192         case MONITOR_ELECTION_NEEDED:
3193                 /* can not happen */
3194         case MONITOR_OK:
3195                 break;
3196         }
3197
3198
3199         if (ctdb->tunable.verify_recovery_lock != 0) {
3200                 /* we should have the reclock - check its not stale */
3201                 ret = check_recovery_lock(ctdb);
3202                 if (ret != 0) {
3203                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3204                         ctdb_set_culprit(rec, ctdb->pnn);
3205                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3206                         return;
3207                 }
3208         }
3209
3210         /* if there are takeovers requested, perform it and notify the waiters */
3211         if (rec->reallocate_callers) {
3212                 process_ipreallocate_requests(ctdb, rec);
3213         }
3214
3215         /* get the nodemap for all active remote nodes
3216          */
3217         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3218         if (remote_nodemaps == NULL) {
3219                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3220                 return;
3221         }
3222         for(i=0; i<nodemap->num; i++) {
3223                 remote_nodemaps[i] = NULL;
3224         }
3225         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3226                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3227                 return;
3228         } 
3229
3230         /* verify that all other nodes have the same nodemap as we have
3231         */
3232         for (j=0; j<nodemap->num; j++) {
3233                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3234                         continue;
3235                 }
3236
3237                 if (remote_nodemaps[j] == NULL) {
3238                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3239                         ctdb_set_culprit(rec, j);
3240
3241                         return;
3242                 }
3243
3244                 /* if the nodes disagree on how many nodes there are
3245                    then this is a good reason to try recovery
3246                  */
3247                 if (remote_nodemaps[j]->num != nodemap->num) {
3248                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3249                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3250                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3251                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3252                         return;
3253                 }
3254
3255                 /* if the nodes disagree on which nodes exist and are
3256                    active, then that is also a good reason to do recovery
3257                  */
3258                 for (i=0;i<nodemap->num;i++) {
3259                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3260                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3261                                           nodemap->nodes[j].pnn, i, 
3262                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3263                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3264                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3265                                             vnnmap);
3266                                 return;
3267                         }
3268                 }
3269
3270                 /* verify the flags are consistent
3271                 */
3272                 for (i=0; i<nodemap->num; i++) {
3273                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3274                                 continue;
3275                         }
3276                         
3277                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3278                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3279                                   nodemap->nodes[j].pnn, 
3280                                   nodemap->nodes[i].pnn, 
3281                                   remote_nodemaps[j]->nodes[i].flags,
3282                                   nodemap->nodes[j].flags));
3283                                 if (i == j) {
3284                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3285                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3286                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3287                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3288                                                     vnnmap);
3289                                         return;
3290                                 } else {
3291                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3292                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3293                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3294                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3295                                                     vnnmap);
3296                                         return;
3297                                 }
3298                         }
3299                 }
3300         }
3301
3302
3303         /* there better be the same number of lmasters in the vnn map
3304            as there are active nodes or we will have to do a recovery
3305          */
3306         if (vnnmap->size != rec->num_active) {
3307                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3308                           vnnmap->size, rec->num_active));
3309                 ctdb_set_culprit(rec, ctdb->pnn);
3310                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3311                 return;
3312         }
3313
3314         /* verify that all active nodes in the nodemap also exist in 
3315            the vnnmap.
3316          */
3317         for (j=0; j<nodemap->num; j++) {
3318                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3319                         continue;
3320                 }
3321                 if (nodemap->nodes[j].pnn == pnn) {
3322                         continue;
3323                 }
3324
3325                 for (i=0; i<vnnmap->size; i++) {
3326                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3327                                 break;
3328                         }
3329                 }
3330                 if (i == vnnmap->size) {
3331                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3332                                   nodemap->nodes[j].pnn));
3333                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3334                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3335                         return;
3336                 }
3337         }
3338
3339         
3340         /* verify that all other nodes have the same vnnmap
3341            and are from the same generation
3342          */
3343         for (j=0; j<nodemap->num; j++) {
3344                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3345                         continue;
3346                 }
3347                 if (nodemap->nodes[j].pnn == pnn) {
3348                         continue;
3349                 }
3350
3351                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3352                                           mem_ctx, &remote_vnnmap);
3353                 if (ret != 0) {
3354                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3355                                   nodemap->nodes[j].pnn));
3356                         return;
3357                 }
3358
3359                 /* verify the vnnmap generation is the same */
3360                 if (vnnmap->generation != remote_vnnmap->generation) {
3361                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3362                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3363                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3364                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3365                         return;
3366                 }
3367
3368                 /* verify the vnnmap size is the same */
3369                 if (vnnmap->size != remote_vnnmap->size) {
3370                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3371                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3372                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3373                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3374                         return;
3375                 }
3376
3377                 /* verify the vnnmap is the same */
3378                 for (i=0;i<vnnmap->size;i++) {
3379                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3380                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3381                                           nodemap->nodes[j].pnn));
3382                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3383                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3384                                             vnnmap);
3385                                 return;
3386                         }
3387                 }
3388         }
3389
3390         /* we might need to change who has what IP assigned */
3391         if (rec->need_takeover_run) {
3392                 uint32_t culprit = (uint32_t)-1;
3393
3394                 rec->need_takeover_run = false;
3395
3396                 /* update the list of public ips that a node can handle for
3397                    all connected nodes
3398                 */
3399                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3400                 if (ret != 0) {
3401                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3402                                          culprit));
3403                         ctdb_set_culprit(rec, culprit);
3404                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3405                         return;
3406                 }
3407
3408                 /* execute the "startrecovery" event script on all nodes */
3409                 ret = run_startrecovery_eventscript(rec, nodemap);
3410                 if (ret!=0) {
3411                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3412                         ctdb_set_culprit(rec, ctdb->pnn);
3413                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3414                         return;
3415                 }
3416
3417                 ret = ctdb_takeover_run(ctdb, nodemap);
3418                 if (ret != 0) {
3419                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3420                         ctdb_set_culprit(rec, ctdb->pnn);
3421                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3422                         return;
3423                 }
3424
3425                 /* execute the "recovered" event script on all nodes */
3426                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3427 #if 0
3428 // we cant check whether the event completed successfully
3429 // since this script WILL fail if the node is in recovery mode
3430 // and if that race happens, the code here would just cause a second
3431 // cascading recovery.
3432                 if (ret!=0) {
3433                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3434                         ctdb_set_culprit(rec, ctdb->pnn);
3435                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3436                 }
3437 #endif
3438         }
3439 }
3440
3441 /*
3442   the main monitoring loop
3443  */
3444 static void monitor_cluster(struct ctdb_context *ctdb)
3445 {
3446         struct ctdb_recoverd *rec;
3447
3448         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3449
3450         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3451         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3452
3453         rec->ctdb = ctdb;
3454
3455         rec->priority_time = timeval_current();
3456
3457         /* register a message port for sending memory dumps */
3458         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3459
3460         /* register a message port for recovery elections */
3461         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3462
3463         /* when nodes are disabled/enabled */
3464         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3465
3466         /* when we are asked to puch out a flag change */
3467         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3468
3469         /* register a message port for vacuum fetch */
3470         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3471
3472         /* register a message port for reloadnodes  */
3473         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3474
3475         /* register a message port for performing a takeover run */
3476         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3477
3478         /* register a message port for disabling the ip check for a short while */
3479         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3480
3481         /* register a message port for updating the recovery daemons node assignment for an ip */
3482         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3483
3484         for (;;) {
3485                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3486                 struct timeval start;
3487                 double elapsed;
3488
3489                 if (!mem_ctx) {
3490                         DEBUG(DEBUG_CRIT,(__location__
3491                                           " Failed to create temp context\n"));
3492                         exit(-1);
3493                 }
3494
3495                 start = timeval_current();
3496                 main_loop(ctdb, rec, mem_ctx);
3497                 talloc_free(mem_ctx);
3498
3499                 /* we only check for recovery once every second */
3500                 elapsed = timeval_elapsed(&start);
3501                 if (elapsed < ctdb->tunable.recover_interval) {
3502                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3503                                           - elapsed);
3504                 }
3505         }
3506 }
3507
3508 /*
3509   event handler for when the main ctdbd dies
3510  */
3511 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3512                                  uint16_t flags, void *private_data)
3513 {
3514         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3515         _exit(1);
3516 }
3517
3518 /*
3519   called regularly to verify that the recovery daemon is still running
3520  */
3521 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3522                               struct timeval yt, void *p)
3523 {
3524         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3525
3526         if (kill(ctdb->recoverd_pid, 0) != 0) {
3527                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3528
3529                 ctdb_stop_recoverd(ctdb);
3530                 ctdb_stop_keepalive(ctdb);
3531                 ctdb_stop_monitoring(ctdb);
3532                 ctdb_release_all_ips(ctdb);
3533                 if (ctdb->methods != NULL) {
3534                         ctdb->methods->shutdown(ctdb);
3535                 }
3536                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3537
3538                 exit(10);       
3539         }
3540
3541         event_add_timed(ctdb->ev, ctdb, 
3542                         timeval_current_ofs(30, 0),
3543                         ctdb_check_recd, ctdb);
3544 }
3545
3546 static void recd_sig_child_handler(struct event_context *ev,
3547         struct signal_event *se, int signum, int count,
3548         void *dont_care, 
3549         void *private_data)
3550 {
3551 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3552         int status;
3553         pid_t pid = -1;
3554
3555         while (pid != 0) {
3556                 pid = waitpid(-1, &status, WNOHANG);
3557                 if (pid == -1) {
3558                         if (errno != ECHILD) {
3559                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3560                         }
3561                         return;
3562                 }
3563                 if (pid > 0) {
3564                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3565                 }
3566         }
3567 }
3568
3569 /*
3570   startup the recovery daemon as a child of the main ctdb daemon
3571  */
3572 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3573 {
3574         int fd[2];
3575         struct signal_event *se;
3576         struct tevent_fd *fde;
3577
3578         if (pipe(fd) != 0) {
3579                 return -1;
3580         }
3581
3582         ctdb->ctdbd_pid = getpid();
3583
3584         ctdb->recoverd_pid = fork();
3585         if (ctdb->recoverd_pid == -1) {
3586                 return -1;
3587         }
3588         
3589         if (ctdb->recoverd_pid != 0) {
3590                 close(fd[0]);
3591                 event_add_timed(ctdb->ev, ctdb, 
3592                                 timeval_current_ofs(30, 0),
3593                                 ctdb_check_recd, ctdb);
3594                 return 0;
3595         }
3596
3597         close(fd[1]);
3598
3599         srandom(getpid() ^ time(NULL));
3600
3601         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3602                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3603                 exit(1);
3604         }
3605
3606         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3607
3608         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3609                      ctdb_recoverd_parent, &fd[0]);     
3610         tevent_fd_set_auto_close(fde);
3611
3612         /* set up a handler to pick up sigchld */
3613         se = event_add_signal(ctdb->ev, ctdb,
3614                                      SIGCHLD, 0,
3615                                      recd_sig_child_handler,
3616                                      ctdb);
3617         if (se == NULL) {
3618                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3619                 exit(1);
3620         }
3621
3622         monitor_cluster(ctdb);
3623
3624         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3625         return -1;
3626 }
3627
3628 /*
3629   shutdown the recovery daemon
3630  */
3631 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3632 {
3633         if (ctdb->recoverd_pid == 0) {
3634                 return;
3635         }
3636
3637         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3638         kill(ctdb->recoverd_pid, SIGTERM);
3639 }