Add a tunable variable to control how long we defer after a ctdb addip until we force...
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68         TALLOC_CTX *deferred_rebalance_ctx;
69 };
70
71 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
72 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
73
74 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
75
76 /*
77   ban a node for a period of time
78  */
79 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
80 {
81         int ret;
82         struct ctdb_context *ctdb = rec->ctdb;
83         struct ctdb_ban_time bantime;
84        
85         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
86
87         if (!ctdb_validate_pnn(ctdb, pnn)) {
88                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
89                 return;
90         }
91
92         bantime.pnn  = pnn;
93         bantime.time = ban_time;
94
95         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
96         if (ret != 0) {
97                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
98                 return;
99         }
100
101 }
102
103 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
104
105
106 /*
107   run the "recovered" eventscript on all nodes
108  */
109 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
110 {
111         TALLOC_CTX *tmp_ctx;
112         uint32_t *nodes;
113
114         tmp_ctx = talloc_new(ctdb);
115         CTDB_NO_MEMORY(ctdb, tmp_ctx);
116
117         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
118         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
119                                         nodes, 0,
120                                         CONTROL_TIMEOUT(), false, tdb_null,
121                                         NULL, NULL,
122                                         NULL) != 0) {
123                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
124
125                 talloc_free(tmp_ctx);
126                 return -1;
127         }
128
129         talloc_free(tmp_ctx);
130         return 0;
131 }
132
133 /*
134   remember the trouble maker
135  */
136 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
137 {
138         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
139         struct ctdb_banning_state *ban_state;
140
141         if (culprit > ctdb->num_nodes) {
142                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
143                 return;
144         }
145
146         if (ctdb->nodes[culprit]->ban_state == NULL) {
147                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
148                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
149
150                 
151         }
152         ban_state = ctdb->nodes[culprit]->ban_state;
153         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
154                 /* this was the first time in a long while this node
155                    misbehaved so we will forgive any old transgressions.
156                 */
157                 ban_state->count = 0;
158         }
159
160         ban_state->count += count;
161         ban_state->last_reported_time = timeval_current();
162         rec->last_culprit_node = culprit;
163 }
164
165 /*
166   remember the trouble maker
167  */
168 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
169 {
170         ctdb_set_culprit_count(rec, culprit, 1);
171 }
172
173
174 /* this callback is called for every node that failed to execute the
175    start recovery event
176 */
177 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
178 {
179         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
180
181         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
182
183         ctdb_set_culprit(rec, node_pnn);
184 }
185
186 /*
187   run the "startrecovery" eventscript on all nodes
188  */
189 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
190 {
191         TALLOC_CTX *tmp_ctx;
192         uint32_t *nodes;
193         struct ctdb_context *ctdb = rec->ctdb;
194
195         tmp_ctx = talloc_new(ctdb);
196         CTDB_NO_MEMORY(ctdb, tmp_ctx);
197
198         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
200                                         nodes, 0,
201                                         CONTROL_TIMEOUT(), false, tdb_null,
202                                         NULL,
203                                         startrecovery_fail_callback,
204                                         rec) != 0) {
205                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
206                 talloc_free(tmp_ctx);
207                 return -1;
208         }
209
210         talloc_free(tmp_ctx);
211         return 0;
212 }
213
214 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
215 {
216         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
217                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
218                 return;
219         }
220         if (node_pnn < ctdb->num_nodes) {
221                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
222         }
223 }
224
225 /*
226   update the node capabilities for all connected nodes
227  */
228 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
229 {
230         uint32_t *nodes;
231         TALLOC_CTX *tmp_ctx;
232
233         tmp_ctx = talloc_new(ctdb);
234         CTDB_NO_MEMORY(ctdb, tmp_ctx);
235
236         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
237         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
238                                         nodes, 0,
239                                         CONTROL_TIMEOUT(),
240                                         false, tdb_null,
241                                         async_getcap_callback, NULL,
242                                         NULL) != 0) {
243                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
244                 talloc_free(tmp_ctx);
245                 return -1;
246         }
247
248         talloc_free(tmp_ctx);
249         return 0;
250 }
251
252 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
253 {
254         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
255
256         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
257         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
258 }
259
260 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
261 {
262         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
263
264         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
265         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
266 }
267
268 /*
269   change recovery mode on all nodes
270  */
271 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
272 {
273         TDB_DATA data;
274         uint32_t *nodes;
275         TALLOC_CTX *tmp_ctx;
276
277         tmp_ctx = talloc_new(ctdb);
278         CTDB_NO_MEMORY(ctdb, tmp_ctx);
279
280         /* freeze all nodes */
281         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
282         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
283                 int i;
284
285                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
286                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
287                                                 nodes, i,
288                                                 CONTROL_TIMEOUT(),
289                                                 false, tdb_null,
290                                                 NULL,
291                                                 set_recmode_fail_callback,
292                                                 rec) != 0) {
293                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
294                                 talloc_free(tmp_ctx);
295                                 return -1;
296                         }
297                 }
298         }
299
300
301         data.dsize = sizeof(uint32_t);
302         data.dptr = (unsigned char *)&rec_mode;
303
304         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
305                                         nodes, 0,
306                                         CONTROL_TIMEOUT(),
307                                         false, data,
308                                         NULL, NULL,
309                                         NULL) != 0) {
310                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
311                 talloc_free(tmp_ctx);
312                 return -1;
313         }
314
315         talloc_free(tmp_ctx);
316         return 0;
317 }
318
319 /*
320   change recovery master on all node
321  */
322 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
323 {
324         TDB_DATA data;
325         TALLOC_CTX *tmp_ctx;
326         uint32_t *nodes;
327
328         tmp_ctx = talloc_new(ctdb);
329         CTDB_NO_MEMORY(ctdb, tmp_ctx);
330
331         data.dsize = sizeof(uint32_t);
332         data.dptr = (unsigned char *)&pnn;
333
334         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
335         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
336                                         nodes, 0,
337                                         CONTROL_TIMEOUT(), false, data,
338                                         NULL, NULL,
339                                         NULL) != 0) {
340                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
341                 talloc_free(tmp_ctx);
342                 return -1;
343         }
344
345         talloc_free(tmp_ctx);
346         return 0;
347 }
348
349 /* update all remote nodes to use the same db priority that we have
350    this can fail if the remove node has not yet been upgraded to 
351    support this function, so we always return success and never fail
352    a recovery if this call fails.
353 */
354 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
355         struct ctdb_node_map *nodemap, 
356         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
357 {
358         int db;
359         uint32_t *nodes;
360
361         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
362
363         /* step through all local databases */
364         for (db=0; db<dbmap->num;db++) {
365                 TDB_DATA data;
366                 struct ctdb_db_priority db_prio;
367                 int ret;
368
369                 db_prio.db_id     = dbmap->dbs[db].dbid;
370                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
371                 if (ret != 0) {
372                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
373                         continue;
374                 }
375
376                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
377
378                 data.dptr  = (uint8_t *)&db_prio;
379                 data.dsize = sizeof(db_prio);
380
381                 if (ctdb_client_async_control(ctdb,
382                                         CTDB_CONTROL_SET_DB_PRIORITY,
383                                         nodes, 0,
384                                         CONTROL_TIMEOUT(), false, data,
385                                         NULL, NULL,
386                                         NULL) != 0) {
387                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
388                 }
389         }
390
391         return 0;
392 }                       
393
394 /*
395   ensure all other nodes have attached to any databases that we have
396  */
397 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
398                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
399 {
400         int i, j, db, ret;
401         struct ctdb_dbid_map *remote_dbmap;
402
403         /* verify that all other nodes have all our databases */
404         for (j=0; j<nodemap->num; j++) {
405                 /* we dont need to ourself ourselves */
406                 if (nodemap->nodes[j].pnn == pnn) {
407                         continue;
408                 }
409                 /* dont check nodes that are unavailable */
410                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
411                         continue;
412                 }
413
414                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
415                                          mem_ctx, &remote_dbmap);
416                 if (ret != 0) {
417                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
418                         return -1;
419                 }
420
421                 /* step through all local databases */
422                 for (db=0; db<dbmap->num;db++) {
423                         const char *name;
424
425
426                         for (i=0;i<remote_dbmap->num;i++) {
427                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
428                                         break;
429                                 }
430                         }
431                         /* the remote node already have this database */
432                         if (i!=remote_dbmap->num) {
433                                 continue;
434                         }
435                         /* ok so we need to create this database */
436                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
437                                             mem_ctx, &name);
438                         if (ret != 0) {
439                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
440                                 return -1;
441                         }
442                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
443                                            mem_ctx, name, dbmap->dbs[db].persistent);
444                         if (ret != 0) {
445                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
446                                 return -1;
447                         }
448                 }
449         }
450
451         return 0;
452 }
453
454
455 /*
456   ensure we are attached to any databases that anyone else is attached to
457  */
458 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
459                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
460 {
461         int i, j, db, ret;
462         struct ctdb_dbid_map *remote_dbmap;
463
464         /* verify that we have all database any other node has */
465         for (j=0; j<nodemap->num; j++) {
466                 /* we dont need to ourself ourselves */
467                 if (nodemap->nodes[j].pnn == pnn) {
468                         continue;
469                 }
470                 /* dont check nodes that are unavailable */
471                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
472                         continue;
473                 }
474
475                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
476                                          mem_ctx, &remote_dbmap);
477                 if (ret != 0) {
478                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
479                         return -1;
480                 }
481
482                 /* step through all databases on the remote node */
483                 for (db=0; db<remote_dbmap->num;db++) {
484                         const char *name;
485
486                         for (i=0;i<(*dbmap)->num;i++) {
487                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
488                                         break;
489                                 }
490                         }
491                         /* we already have this db locally */
492                         if (i!=(*dbmap)->num) {
493                                 continue;
494                         }
495                         /* ok so we need to create this database and
496                            rebuild dbmap
497                          */
498                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
499                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
500                         if (ret != 0) {
501                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
502                                           nodemap->nodes[j].pnn));
503                                 return -1;
504                         }
505                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
506                                            remote_dbmap->dbs[db].persistent);
507                         if (ret != 0) {
508                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
509                                 return -1;
510                         }
511                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
512                         if (ret != 0) {
513                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
514                                 return -1;
515                         }
516                 }
517         }
518
519         return 0;
520 }
521
522
523 /*
524   pull the remote database contents from one node into the recdb
525  */
526 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
527                                     struct tdb_wrap *recdb, uint32_t dbid,
528                                     bool persistent)
529 {
530         int ret;
531         TDB_DATA outdata;
532         struct ctdb_marshall_buffer *reply;
533         struct ctdb_rec_data *rec;
534         int i;
535         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
536
537         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
538                                CONTROL_TIMEOUT(), &outdata);
539         if (ret != 0) {
540                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
541                 talloc_free(tmp_ctx);
542                 return -1;
543         }
544
545         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
546
547         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
548                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552         
553         rec = (struct ctdb_rec_data *)&reply->data[0];
554         
555         for (i=0;
556              i<reply->count;
557              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
558                 TDB_DATA key, data;
559                 struct ctdb_ltdb_header *hdr;
560                 TDB_DATA existing;
561                 
562                 key.dptr = &rec->data[0];
563                 key.dsize = rec->keylen;
564                 data.dptr = &rec->data[key.dsize];
565                 data.dsize = rec->datalen;
566                 
567                 hdr = (struct ctdb_ltdb_header *)data.dptr;
568
569                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
570                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
571                         talloc_free(tmp_ctx);
572                         return -1;
573                 }
574
575                 /* fetch the existing record, if any */
576                 existing = tdb_fetch(recdb->tdb, key);
577                 
578                 if (existing.dptr != NULL) {
579                         struct ctdb_ltdb_header header;
580                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
581                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
582                                          (unsigned)existing.dsize, srcnode));
583                                 free(existing.dptr);
584                                 talloc_free(tmp_ctx);
585                                 return -1;
586                         }
587                         header = *(struct ctdb_ltdb_header *)existing.dptr;
588                         free(existing.dptr);
589                         if (!(header.rsn < hdr->rsn ||
590                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
591                                 continue;
592                         }
593                 }
594                 
595                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
596                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
597                         talloc_free(tmp_ctx);
598                         return -1;                              
599                 }
600         }
601
602         talloc_free(tmp_ctx);
603
604         return 0;
605 }
606
607 /*
608   pull all the remote database contents into the recdb
609  */
610 static int pull_remote_database(struct ctdb_context *ctdb,
611                                 struct ctdb_recoverd *rec, 
612                                 struct ctdb_node_map *nodemap, 
613                                 struct tdb_wrap *recdb, uint32_t dbid,
614                                 bool persistent)
615 {
616         int j;
617
618         /* pull all records from all other nodes across onto this node
619            (this merges based on rsn)
620         */
621         for (j=0; j<nodemap->num; j++) {
622                 /* dont merge from nodes that are unavailable */
623                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
624                         continue;
625                 }
626                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
627                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
628                                  nodemap->nodes[j].pnn));
629                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
630                         return -1;
631                 }
632         }
633         
634         return 0;
635 }
636
637
638 /*
639   update flags on all active nodes
640  */
641 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
642 {
643         int ret;
644
645         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
646                 if (ret != 0) {
647                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
648                 return -1;
649         }
650
651         return 0;
652 }
653
654 /*
655   ensure all nodes have the same vnnmap we do
656  */
657 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
658                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
659 {
660         int j, ret;
661
662         /* push the new vnn map out to all the nodes */
663         for (j=0; j<nodemap->num; j++) {
664                 /* dont push to nodes that are unavailable */
665                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
666                         continue;
667                 }
668
669                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
670                 if (ret != 0) {
671                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
672                         return -1;
673                 }
674         }
675
676         return 0;
677 }
678
679
680 struct vacuum_info {
681         struct vacuum_info *next, *prev;
682         struct ctdb_recoverd *rec;
683         uint32_t srcnode;
684         struct ctdb_db_context *ctdb_db;
685         struct ctdb_marshall_buffer *recs;
686         struct ctdb_rec_data *r;
687 };
688
689 static void vacuum_fetch_next(struct vacuum_info *v);
690
691 /*
692   called when a vacuum fetch has completed - just free it and do the next one
693  */
694 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
695 {
696         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
697         talloc_free(state);
698         vacuum_fetch_next(v);
699 }
700
701
702 /*
703   process the next element from the vacuum list
704 */
705 static void vacuum_fetch_next(struct vacuum_info *v)
706 {
707         struct ctdb_call call;
708         struct ctdb_rec_data *r;
709
710         while (v->recs->count) {
711                 struct ctdb_client_call_state *state;
712                 TDB_DATA data;
713                 struct ctdb_ltdb_header *hdr;
714
715                 ZERO_STRUCT(call);
716                 call.call_id = CTDB_NULL_FUNC;
717                 call.flags = CTDB_IMMEDIATE_MIGRATION;
718                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
719
720                 r = v->r;
721                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
722                 v->recs->count--;
723
724                 call.key.dptr = &r->data[0];
725                 call.key.dsize = r->keylen;
726
727                 /* ensure we don't block this daemon - just skip a record if we can't get
728                    the chainlock */
729                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
730                         continue;
731                 }
732
733                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
734                 if (data.dptr == NULL) {
735                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
736                         continue;
737                 }
738
739                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
740                         free(data.dptr);
741                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
742                         continue;
743                 }
744                 
745                 hdr = (struct ctdb_ltdb_header *)data.dptr;
746                 if (hdr->dmaster == v->rec->ctdb->pnn) {
747                         /* its already local */
748                         free(data.dptr);
749                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
750                         continue;
751                 }
752
753                 free(data.dptr);
754
755                 state = ctdb_call_send(v->ctdb_db, &call);
756                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
757                 if (state == NULL) {
758                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
759                         talloc_free(v);
760                         return;
761                 }
762                 state->async.fn = vacuum_fetch_callback;
763                 state->async.private_data = v;
764                 return;
765         }
766
767         talloc_free(v);
768 }
769
770
771 /*
772   destroy a vacuum info structure
773  */
774 static int vacuum_info_destructor(struct vacuum_info *v)
775 {
776         DLIST_REMOVE(v->rec->vacuum_info, v);
777         return 0;
778 }
779
780
781 /*
782   handler for vacuum fetch
783 */
784 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
785                                  TDB_DATA data, void *private_data)
786 {
787         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
788         struct ctdb_marshall_buffer *recs;
789         int ret, i;
790         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
791         const char *name;
792         struct ctdb_dbid_map *dbmap=NULL;
793         bool persistent = false;
794         struct ctdb_db_context *ctdb_db;
795         struct ctdb_rec_data *r;
796         uint32_t srcnode;
797         struct vacuum_info *v;
798
799         recs = (struct ctdb_marshall_buffer *)data.dptr;
800         r = (struct ctdb_rec_data *)&recs->data[0];
801
802         if (recs->count == 0) {
803                 talloc_free(tmp_ctx);
804                 return;
805         }
806
807         srcnode = r->reqid;
808
809         for (v=rec->vacuum_info;v;v=v->next) {
810                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
811                         /* we're already working on records from this node */
812                         talloc_free(tmp_ctx);
813                         return;
814                 }
815         }
816
817         /* work out if the database is persistent */
818         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
819         if (ret != 0) {
820                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
821                 talloc_free(tmp_ctx);
822                 return;
823         }
824
825         for (i=0;i<dbmap->num;i++) {
826                 if (dbmap->dbs[i].dbid == recs->db_id) {
827                         persistent = dbmap->dbs[i].persistent;
828                         break;
829                 }
830         }
831         if (i == dbmap->num) {
832                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
833                 talloc_free(tmp_ctx);
834                 return;         
835         }
836
837         /* find the name of this database */
838         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
839                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
840                 talloc_free(tmp_ctx);
841                 return;
842         }
843
844         /* attach to it */
845         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
846         if (ctdb_db == NULL) {
847                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
848                 talloc_free(tmp_ctx);
849                 return;
850         }
851
852         v = talloc_zero(rec, struct vacuum_info);
853         if (v == NULL) {
854                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
855                 talloc_free(tmp_ctx);
856                 return;
857         }
858
859         v->rec = rec;
860         v->srcnode = srcnode;
861         v->ctdb_db = ctdb_db;
862         v->recs = talloc_memdup(v, recs, data.dsize);
863         if (v->recs == NULL) {
864                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
865                 talloc_free(v);
866                 talloc_free(tmp_ctx);
867                 return;         
868         }
869         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
870
871         DLIST_ADD(rec->vacuum_info, v);
872
873         talloc_set_destructor(v, vacuum_info_destructor);
874
875         vacuum_fetch_next(v);
876         talloc_free(tmp_ctx);
877 }
878
879
880 /*
881   called when ctdb_wait_timeout should finish
882  */
883 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
884                               struct timeval yt, void *p)
885 {
886         uint32_t *timed_out = (uint32_t *)p;
887         (*timed_out) = 1;
888 }
889
890 /*
891   wait for a given number of seconds
892  */
893 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
894 {
895         uint32_t timed_out = 0;
896         time_t usecs = (secs - (time_t)secs) * 1000000;
897         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
898         while (!timed_out) {
899                 event_loop_once(ctdb->ev);
900         }
901 }
902
903 /*
904   called when an election times out (ends)
905  */
906 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
907                                   struct timeval t, void *p)
908 {
909         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
910         rec->election_timeout = NULL;
911         fast_start = false;
912
913         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
914 }
915
916
917 /*
918   wait for an election to finish. It finished election_timeout seconds after
919   the last election packet is received
920  */
921 static void ctdb_wait_election(struct ctdb_recoverd *rec)
922 {
923         struct ctdb_context *ctdb = rec->ctdb;
924         while (rec->election_timeout) {
925                 event_loop_once(ctdb->ev);
926         }
927 }
928
929 /*
930   Update our local flags from all remote connected nodes. 
931   This is only run when we are or we belive we are the recovery master
932  */
933 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
934 {
935         int j;
936         struct ctdb_context *ctdb = rec->ctdb;
937         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
938
939         /* get the nodemap for all active remote nodes and verify
940            they are the same as for this node
941          */
942         for (j=0; j<nodemap->num; j++) {
943                 struct ctdb_node_map *remote_nodemap=NULL;
944                 int ret;
945
946                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
947                         continue;
948                 }
949                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
950                         continue;
951                 }
952
953                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
954                                            mem_ctx, &remote_nodemap);
955                 if (ret != 0) {
956                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
957                                   nodemap->nodes[j].pnn));
958                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
959                         talloc_free(mem_ctx);
960                         return MONITOR_FAILED;
961                 }
962                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
963                         /* We should tell our daemon about this so it
964                            updates its flags or else we will log the same 
965                            message again in the next iteration of recovery.
966                            Since we are the recovery master we can just as
967                            well update the flags on all nodes.
968                         */
969                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
970                         if (ret != 0) {
971                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
972                                 return -1;
973                         }
974
975                         /* Update our local copy of the flags in the recovery
976                            daemon.
977                         */
978                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
979                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
980                                  nodemap->nodes[j].flags));
981                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
982                 }
983                 talloc_free(remote_nodemap);
984         }
985         talloc_free(mem_ctx);
986         return MONITOR_OK;
987 }
988
989
990 /* Create a new random generation ip. 
991    The generation id can not be the INVALID_GENERATION id
992 */
993 static uint32_t new_generation(void)
994 {
995         uint32_t generation;
996
997         while (1) {
998                 generation = random();
999
1000                 if (generation != INVALID_GENERATION) {
1001                         break;
1002                 }
1003         }
1004
1005         return generation;
1006 }
1007
1008
1009 /*
1010   create a temporary working database
1011  */
1012 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1013 {
1014         char *name;
1015         struct tdb_wrap *recdb;
1016         unsigned tdb_flags;
1017
1018         /* open up the temporary recovery database */
1019         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1020                                ctdb->db_directory_state,
1021                                ctdb->pnn);
1022         if (name == NULL) {
1023                 return NULL;
1024         }
1025         unlink(name);
1026
1027         tdb_flags = TDB_NOLOCK;
1028         if (ctdb->valgrinding) {
1029                 tdb_flags |= TDB_NOMMAP;
1030         }
1031         tdb_flags |= TDB_DISALLOW_NESTING;
1032
1033         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1034                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1035         if (recdb == NULL) {
1036                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1037         }
1038
1039         talloc_free(name);
1040
1041         return recdb;
1042 }
1043
1044
1045 /* 
1046    a traverse function for pulling all relevent records from recdb
1047  */
1048 struct recdb_data {
1049         struct ctdb_context *ctdb;
1050         struct ctdb_marshall_buffer *recdata;
1051         uint32_t len;
1052         bool failed;
1053         bool persistent;
1054 };
1055
1056 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1057 {
1058         struct recdb_data *params = (struct recdb_data *)p;
1059         struct ctdb_rec_data *rec;
1060         struct ctdb_ltdb_header *hdr;
1061
1062         /* skip empty records */
1063         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1064                 return 0;
1065         }
1066
1067         /* update the dmaster field to point to us */
1068         hdr = (struct ctdb_ltdb_header *)data.dptr;
1069         if (!params->persistent) {
1070                 hdr->dmaster = params->ctdb->pnn;
1071                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1072         }
1073
1074         /* add the record to the blob ready to send to the nodes */
1075         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1076         if (rec == NULL) {
1077                 params->failed = true;
1078                 return -1;
1079         }
1080         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1081         if (params->recdata == NULL) {
1082                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1083                          rec->length + params->len, params->recdata->count));
1084                 params->failed = true;
1085                 return -1;
1086         }
1087         params->recdata->count++;
1088         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1089         params->len += rec->length;
1090         talloc_free(rec);
1091
1092         return 0;
1093 }
1094
1095 /*
1096   push the recdb database out to all nodes
1097  */
1098 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1099                                bool persistent,
1100                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1101 {
1102         struct recdb_data params;
1103         struct ctdb_marshall_buffer *recdata;
1104         TDB_DATA outdata;
1105         TALLOC_CTX *tmp_ctx;
1106         uint32_t *nodes;
1107
1108         tmp_ctx = talloc_new(ctdb);
1109         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1110
1111         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1112         CTDB_NO_MEMORY(ctdb, recdata);
1113
1114         recdata->db_id = dbid;
1115
1116         params.ctdb = ctdb;
1117         params.recdata = recdata;
1118         params.len = offsetof(struct ctdb_marshall_buffer, data);
1119         params.failed = false;
1120         params.persistent = persistent;
1121
1122         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1123                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1124                 talloc_free(params.recdata);
1125                 talloc_free(tmp_ctx);
1126                 return -1;
1127         }
1128
1129         if (params.failed) {
1130                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1131                 talloc_free(params.recdata);
1132                 talloc_free(tmp_ctx);
1133                 return -1;              
1134         }
1135
1136         recdata = params.recdata;
1137
1138         outdata.dptr = (void *)recdata;
1139         outdata.dsize = params.len;
1140
1141         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1142         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1143                                         nodes, 0,
1144                                         CONTROL_TIMEOUT(), false, outdata,
1145                                         NULL, NULL,
1146                                         NULL) != 0) {
1147                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1148                 talloc_free(recdata);
1149                 talloc_free(tmp_ctx);
1150                 return -1;
1151         }
1152
1153         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1154                   dbid, recdata->count));
1155
1156         talloc_free(recdata);
1157         talloc_free(tmp_ctx);
1158
1159         return 0;
1160 }
1161
1162
1163 /*
1164   go through a full recovery on one database 
1165  */
1166 static int recover_database(struct ctdb_recoverd *rec, 
1167                             TALLOC_CTX *mem_ctx,
1168                             uint32_t dbid,
1169                             bool persistent,
1170                             uint32_t pnn, 
1171                             struct ctdb_node_map *nodemap,
1172                             uint32_t transaction_id)
1173 {
1174         struct tdb_wrap *recdb;
1175         int ret;
1176         struct ctdb_context *ctdb = rec->ctdb;
1177         TDB_DATA data;
1178         struct ctdb_control_wipe_database w;
1179         uint32_t *nodes;
1180
1181         recdb = create_recdb(ctdb, mem_ctx);
1182         if (recdb == NULL) {
1183                 return -1;
1184         }
1185
1186         /* pull all remote databases onto the recdb */
1187         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1188         if (ret != 0) {
1189                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1190                 return -1;
1191         }
1192
1193         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1194
1195         /* wipe all the remote databases. This is safe as we are in a transaction */
1196         w.db_id = dbid;
1197         w.transaction_id = transaction_id;
1198
1199         data.dptr = (void *)&w;
1200         data.dsize = sizeof(w);
1201
1202         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1204                                         nodes, 0,
1205                                         CONTROL_TIMEOUT(), false, data,
1206                                         NULL, NULL,
1207                                         NULL) != 0) {
1208                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1209                 talloc_free(recdb);
1210                 return -1;
1211         }
1212         
1213         /* push out the correct database. This sets the dmaster and skips 
1214            the empty records */
1215         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1216         if (ret != 0) {
1217                 talloc_free(recdb);
1218                 return -1;
1219         }
1220
1221         /* all done with this database */
1222         talloc_free(recdb);
1223
1224         return 0;
1225 }
1226
1227 /*
1228   reload the nodes file 
1229 */
1230 static void reload_nodes_file(struct ctdb_context *ctdb)
1231 {
1232         ctdb->nodes = NULL;
1233         ctdb_load_nodes_file(ctdb);
1234 }
1235
1236 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1237                                          struct ctdb_recoverd *rec,
1238                                          struct ctdb_node_map *nodemap,
1239                                          uint32_t *culprit)
1240 {
1241         int j;
1242         int ret;
1243
1244         if (ctdb->num_nodes != nodemap->num) {
1245                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1246                                   ctdb->num_nodes, nodemap->num));
1247                 if (culprit) {
1248                         *culprit = ctdb->pnn;
1249                 }
1250                 return -1;
1251         }
1252
1253         for (j=0; j<nodemap->num; j++) {
1254                 /* release any existing data */
1255                 if (ctdb->nodes[j]->known_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->known_public_ips);
1257                         ctdb->nodes[j]->known_public_ips = NULL;
1258                 }
1259                 if (ctdb->nodes[j]->available_public_ips) {
1260                         talloc_free(ctdb->nodes[j]->available_public_ips);
1261                         ctdb->nodes[j]->available_public_ips = NULL;
1262                 }
1263
1264                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1265                         continue;
1266                 }
1267
1268                 /* grab a new shiny list of public ips from the node */
1269                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1270                                         CONTROL_TIMEOUT(),
1271                                         ctdb->nodes[j]->pnn,
1272                                         ctdb->nodes,
1273                                         0,
1274                                         &ctdb->nodes[j]->known_public_ips);
1275                 if (ret != 0) {
1276                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1277                                 ctdb->nodes[j]->pnn));
1278                         if (culprit) {
1279                                 *culprit = ctdb->nodes[j]->pnn;
1280                         }
1281                         return -1;
1282                 }
1283
1284                 if (ctdb->tunable.disable_ip_failover == 0) {
1285                         if (rec->ip_check_disable_ctx == NULL) {
1286                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1287                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1288                                         rec->need_takeover_run = true;
1289                                 }
1290                         }
1291                 }
1292
1293                 /* grab a new shiny list of public ips from the node */
1294                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1295                                         CONTROL_TIMEOUT(),
1296                                         ctdb->nodes[j]->pnn,
1297                                         ctdb->nodes,
1298                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1299                                         &ctdb->nodes[j]->available_public_ips);
1300                 if (ret != 0) {
1301                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1302                                 ctdb->nodes[j]->pnn));
1303                         if (culprit) {
1304                                 *culprit = ctdb->nodes[j]->pnn;
1305                         }
1306                         return -1;
1307                 }
1308         }
1309
1310         return 0;
1311 }
1312
1313 /* when we start a recovery, make sure all nodes use the same reclock file
1314    setting
1315 */
1316 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1317 {
1318         struct ctdb_context *ctdb = rec->ctdb;
1319         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1320         TDB_DATA data;
1321         uint32_t *nodes;
1322
1323         if (ctdb->recovery_lock_file == NULL) {
1324                 data.dptr  = NULL;
1325                 data.dsize = 0;
1326         } else {
1327                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1328                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1329         }
1330
1331         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1333                                         nodes, 0,
1334                                         CONTROL_TIMEOUT(),
1335                                         false, data,
1336                                         NULL, NULL,
1337                                         rec) != 0) {
1338                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1339                 talloc_free(tmp_ctx);
1340                 return -1;
1341         }
1342
1343         talloc_free(tmp_ctx);
1344         return 0;
1345 }
1346
1347
1348 /*
1349   we are the recmaster, and recovery is needed - start a recovery run
1350  */
1351 static int do_recovery(struct ctdb_recoverd *rec, 
1352                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1353                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1354 {
1355         struct ctdb_context *ctdb = rec->ctdb;
1356         int i, j, ret;
1357         uint32_t generation;
1358         struct ctdb_dbid_map *dbmap;
1359         TDB_DATA data;
1360         uint32_t *nodes;
1361         struct timeval start_time;
1362         uint32_t culprit = (uint32_t)-1;
1363
1364         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1365
1366         /* if recovery fails, force it again */
1367         rec->need_recovery = true;
1368
1369         for (i=0; i<ctdb->num_nodes; i++) {
1370                 struct ctdb_banning_state *ban_state;
1371
1372                 if (ctdb->nodes[i]->ban_state == NULL) {
1373                         continue;
1374                 }
1375                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1376                 if (ban_state->count < 2*ctdb->num_nodes) {
1377                         continue;
1378                 }
1379                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1380                         ctdb->nodes[i]->pnn, ban_state->count,
1381                         ctdb->tunable.recovery_ban_period));
1382                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1383                 ban_state->count = 0;
1384         }
1385
1386
1387         if (ctdb->tunable.verify_recovery_lock != 0) {
1388                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1389                 start_time = timeval_current();
1390                 if (!ctdb_recovery_lock(ctdb, true)) {
1391                         ctdb_set_culprit(rec, pnn);
1392                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1393                         return -1;
1394                 }
1395                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1396                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1397         }
1398
1399         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1400
1401         /* get a list of all databases */
1402         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1403         if (ret != 0) {
1404                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1405                 return -1;
1406         }
1407
1408         /* we do the db creation before we set the recovery mode, so the freeze happens
1409            on all databases we will be dealing with. */
1410
1411         /* verify that we have all the databases any other node has */
1412         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1413         if (ret != 0) {
1414                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1415                 return -1;
1416         }
1417
1418         /* verify that all other nodes have all our databases */
1419         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1420         if (ret != 0) {
1421                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1422                 return -1;
1423         }
1424         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1425
1426         /* update the database priority for all remote databases */
1427         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1428         if (ret != 0) {
1429                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1430         }
1431         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1432
1433
1434         /* update all other nodes to use the same setting for reclock files
1435            as the local recovery master.
1436         */
1437         sync_recovery_lock_file_across_cluster(rec);
1438
1439         /* set recovery mode to active on all nodes */
1440         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1441         if (ret != 0) {
1442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1443                 return -1;
1444         }
1445
1446         /* execute the "startrecovery" event script on all nodes */
1447         ret = run_startrecovery_eventscript(rec, nodemap);
1448         if (ret!=0) {
1449                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1450                 return -1;
1451         }
1452
1453         /*
1454           update all nodes to have the same flags that we have
1455          */
1456         for (i=0;i<nodemap->num;i++) {
1457                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1458                         continue;
1459                 }
1460
1461                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1462                 if (ret != 0) {
1463                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1464                         return -1;
1465                 }
1466         }
1467
1468         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1469
1470         /* pick a new generation number */
1471         generation = new_generation();
1472
1473         /* change the vnnmap on this node to use the new generation 
1474            number but not on any other nodes.
1475            this guarantees that if we abort the recovery prematurely
1476            for some reason (a node stops responding?)
1477            that we can just return immediately and we will reenter
1478            recovery shortly again.
1479            I.e. we deliberately leave the cluster with an inconsistent
1480            generation id to allow us to abort recovery at any stage and
1481            just restart it from scratch.
1482          */
1483         vnnmap->generation = generation;
1484         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1485         if (ret != 0) {
1486                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1487                 return -1;
1488         }
1489
1490         data.dptr = (void *)&generation;
1491         data.dsize = sizeof(uint32_t);
1492
1493         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1494         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1495                                         nodes, 0,
1496                                         CONTROL_TIMEOUT(), false, data,
1497                                         NULL,
1498                                         transaction_start_fail_callback,
1499                                         rec) != 0) {
1500                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1501                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1502                                         nodes, 0,
1503                                         CONTROL_TIMEOUT(), false, tdb_null,
1504                                         NULL,
1505                                         NULL,
1506                                         NULL) != 0) {
1507                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1508                 }
1509                 return -1;
1510         }
1511
1512         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1513
1514         for (i=0;i<dbmap->num;i++) {
1515                 ret = recover_database(rec, mem_ctx,
1516                                        dbmap->dbs[i].dbid,
1517                                        dbmap->dbs[i].persistent,
1518                                        pnn, nodemap, generation);
1519                 if (ret != 0) {
1520                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1521                         return -1;
1522                 }
1523         }
1524
1525         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1526
1527         /* commit all the changes */
1528         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1529                                         nodes, 0,
1530                                         CONTROL_TIMEOUT(), false, data,
1531                                         NULL, NULL,
1532                                         NULL) != 0) {
1533                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1534                 return -1;
1535         }
1536
1537         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1538         
1539
1540         /* update the capabilities for all nodes */
1541         ret = update_capabilities(ctdb, nodemap);
1542         if (ret!=0) {
1543                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1544                 return -1;
1545         }
1546
1547         /* build a new vnn map with all the currently active and
1548            unbanned nodes */
1549         generation = new_generation();
1550         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1551         CTDB_NO_MEMORY(ctdb, vnnmap);
1552         vnnmap->generation = generation;
1553         vnnmap->size = 0;
1554         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1555         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1556         for (i=j=0;i<nodemap->num;i++) {
1557                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1558                         continue;
1559                 }
1560                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1561                         /* this node can not be an lmaster */
1562                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1563                         continue;
1564                 }
1565
1566                 vnnmap->size++;
1567                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1568                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1569                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1570
1571         }
1572         if (vnnmap->size == 0) {
1573                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1574                 vnnmap->size++;
1575                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1576                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1577                 vnnmap->map[0] = pnn;
1578         }       
1579
1580         /* update to the new vnnmap on all nodes */
1581         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1582         if (ret != 0) {
1583                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1584                 return -1;
1585         }
1586
1587         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1588
1589         /* update recmaster to point to us for all nodes */
1590         ret = set_recovery_master(ctdb, nodemap, pnn);
1591         if (ret!=0) {
1592                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1593                 return -1;
1594         }
1595
1596         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1597
1598         /*
1599           update all nodes to have the same flags that we have
1600          */
1601         for (i=0;i<nodemap->num;i++) {
1602                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1603                         continue;
1604                 }
1605
1606                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1607                 if (ret != 0) {
1608                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1609                         return -1;
1610                 }
1611         }
1612
1613         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1614
1615         /* disable recovery mode */
1616         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1617         if (ret != 0) {
1618                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1619                 return -1;
1620         }
1621
1622         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1623
1624         /*
1625           tell nodes to takeover their public IPs
1626          */
1627         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1628         if (ret != 0) {
1629                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1630                                  culprit));
1631                 rec->need_takeover_run = true;
1632                 return -1;
1633         }
1634         rec->need_takeover_run = false;
1635         ret = ctdb_takeover_run(ctdb, nodemap);
1636         if (ret != 0) {
1637                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1638                 rec->need_takeover_run = true;
1639         }
1640
1641         /* execute the "recovered" event script on all nodes */
1642         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1643         if (ret!=0) {
1644                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1645                 return -1;
1646         }
1647
1648         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1649
1650         /* send a message to all clients telling them that the cluster 
1651            has been reconfigured */
1652         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1653
1654         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1655
1656         rec->need_recovery = false;
1657
1658         /* we managed to complete a full recovery, make sure to forgive
1659            any past sins by the nodes that could now participate in the
1660            recovery.
1661         */
1662         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1663         for (i=0;i<nodemap->num;i++) {
1664                 struct ctdb_banning_state *ban_state;
1665
1666                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1667                         continue;
1668                 }
1669
1670                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1671                 if (ban_state == NULL) {
1672                         continue;
1673                 }
1674
1675                 ban_state->count = 0;
1676         }
1677
1678
1679         /* We just finished a recovery successfully. 
1680            We now wait for rerecovery_timeout before we allow 
1681            another recovery to take place.
1682         */
1683         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1684         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1685         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1686
1687         return 0;
1688 }
1689
1690
1691 /*
1692   elections are won by first checking the number of connected nodes, then
1693   the priority time, then the pnn
1694  */
1695 struct election_message {
1696         uint32_t num_connected;
1697         struct timeval priority_time;
1698         uint32_t pnn;
1699         uint32_t node_flags;
1700 };
1701
1702 /*
1703   form this nodes election data
1704  */
1705 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1706 {
1707         int ret, i;
1708         struct ctdb_node_map *nodemap;
1709         struct ctdb_context *ctdb = rec->ctdb;
1710
1711         ZERO_STRUCTP(em);
1712
1713         em->pnn = rec->ctdb->pnn;
1714         em->priority_time = rec->priority_time;
1715
1716         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1717         if (ret != 0) {
1718                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1719                 return;
1720         }
1721
1722         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1723         em->node_flags = rec->node_flags;
1724
1725         for (i=0;i<nodemap->num;i++) {
1726                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1727                         em->num_connected++;
1728                 }
1729         }
1730
1731         /* we shouldnt try to win this election if we cant be a recmaster */
1732         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1733                 em->num_connected = 0;
1734                 em->priority_time = timeval_current();
1735         }
1736
1737         talloc_free(nodemap);
1738 }
1739
1740 /*
1741   see if the given election data wins
1742  */
1743 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1744 {
1745         struct election_message myem;
1746         int cmp = 0;
1747
1748         ctdb_election_data(rec, &myem);
1749
1750         /* we cant win if we dont have the recmaster capability */
1751         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1752                 return false;
1753         }
1754
1755         /* we cant win if we are banned */
1756         if (rec->node_flags & NODE_FLAGS_BANNED) {
1757                 return false;
1758         }       
1759
1760         /* we cant win if we are stopped */
1761         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1762                 return false;
1763         }       
1764
1765         /* we will automatically win if the other node is banned */
1766         if (em->node_flags & NODE_FLAGS_BANNED) {
1767                 return true;
1768         }
1769
1770         /* we will automatically win if the other node is banned */
1771         if (em->node_flags & NODE_FLAGS_STOPPED) {
1772                 return true;
1773         }
1774
1775         /* try to use the most connected node */
1776         if (cmp == 0) {
1777                 cmp = (int)myem.num_connected - (int)em->num_connected;
1778         }
1779
1780         /* then the longest running node */
1781         if (cmp == 0) {
1782                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1783         }
1784
1785         if (cmp == 0) {
1786                 cmp = (int)myem.pnn - (int)em->pnn;
1787         }
1788
1789         return cmp > 0;
1790 }
1791
1792 /*
1793   send out an election request
1794  */
1795 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1796 {
1797         int ret;
1798         TDB_DATA election_data;
1799         struct election_message emsg;
1800         uint64_t srvid;
1801         struct ctdb_context *ctdb = rec->ctdb;
1802
1803         srvid = CTDB_SRVID_RECOVERY;
1804
1805         ctdb_election_data(rec, &emsg);
1806
1807         election_data.dsize = sizeof(struct election_message);
1808         election_data.dptr  = (unsigned char *)&emsg;
1809
1810
1811         /* send an election message to all active nodes */
1812         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1813         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1814
1815
1816         /* A new node that is already frozen has entered the cluster.
1817            The existing nodes are not frozen and dont need to be frozen
1818            until the election has ended and we start the actual recovery
1819         */
1820         if (update_recmaster == true) {
1821                 /* first we assume we will win the election and set 
1822                    recoverymaster to be ourself on the current node
1823                  */
1824                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1825                 if (ret != 0) {
1826                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1827                         return -1;
1828                 }
1829         }
1830
1831
1832         return 0;
1833 }
1834
1835 /*
1836   this function will unban all nodes in the cluster
1837 */
1838 static void unban_all_nodes(struct ctdb_context *ctdb)
1839 {
1840         int ret, i;
1841         struct ctdb_node_map *nodemap;
1842         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1843         
1844         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1845         if (ret != 0) {
1846                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1847                 return;
1848         }
1849
1850         for (i=0;i<nodemap->num;i++) {
1851                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1852                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1853                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1854                 }
1855         }
1856
1857         talloc_free(tmp_ctx);
1858 }
1859
1860
1861 /*
1862   we think we are winning the election - send a broadcast election request
1863  */
1864 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1865 {
1866         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1867         int ret;
1868
1869         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1870         if (ret != 0) {
1871                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1872         }
1873
1874         talloc_free(rec->send_election_te);
1875         rec->send_election_te = NULL;
1876 }
1877
1878 /*
1879   handler for memory dumps
1880 */
1881 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1882                              TDB_DATA data, void *private_data)
1883 {
1884         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1885         TDB_DATA *dump;
1886         int ret;
1887         struct rd_memdump_reply *rd;
1888
1889         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1890                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1891                 talloc_free(tmp_ctx);
1892                 return;
1893         }
1894         rd = (struct rd_memdump_reply *)data.dptr;
1895
1896         dump = talloc_zero(tmp_ctx, TDB_DATA);
1897         if (dump == NULL) {
1898                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1899                 talloc_free(tmp_ctx);
1900                 return;
1901         }
1902         ret = ctdb_dump_memory(ctdb, dump);
1903         if (ret != 0) {
1904                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1905                 talloc_free(tmp_ctx);
1906                 return;
1907         }
1908
1909 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1910
1911         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1912         if (ret != 0) {
1913                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1914                 talloc_free(tmp_ctx);
1915                 return;
1916         }
1917
1918         talloc_free(tmp_ctx);
1919 }
1920
1921 /*
1922   handler for reload_nodes
1923 */
1924 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1925                              TDB_DATA data, void *private_data)
1926 {
1927         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1928
1929         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1930
1931         reload_nodes_file(rec->ctdb);
1932 }
1933
1934
1935 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1936                               struct timeval yt, void *p)
1937 {
1938         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1939
1940         talloc_free(rec->ip_check_disable_ctx);
1941         rec->ip_check_disable_ctx = NULL;
1942 }
1943
1944
1945 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
1946                                   struct timeval t, void *p)
1947 {
1948         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1949         struct ctdb_context *ctdb = rec->ctdb;
1950         int ret;
1951
1952         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
1953
1954         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1955         if (ret != 0) {
1956                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1957                 rec->need_takeover_run = true;
1958         }
1959
1960         talloc_free(rec->deferred_rebalance_ctx);
1961         rec->deferred_rebalance_ctx = NULL;
1962 }
1963
1964         
1965 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1966                              TDB_DATA data, void *private_data)
1967 {
1968         uint32_t pnn;
1969         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1970
1971         if (data.dsize != sizeof(uint32_t)) {
1972                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
1973                 return;
1974         }
1975
1976         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
1977                 return;
1978         }
1979
1980         pnn = *(uint32_t *)&data.dptr[0];
1981
1982         lcp2_forcerebalance(ctdb, pnn);
1983         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
1984
1985         if (rec->deferred_rebalance_ctx != NULL) {
1986                 talloc_free(rec->deferred_rebalance_ctx);
1987         }
1988         rec->deferred_rebalance_ctx = talloc_new(rec);
1989         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
1990                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
1991                         ctdb_rebalance_timeout, rec);
1992 }
1993
1994
1995
1996 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1997                              TDB_DATA data, void *private_data)
1998 {
1999         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2000         struct ctdb_public_ip *ip;
2001
2002         if (rec->recmaster != rec->ctdb->pnn) {
2003                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2004                 return;
2005         }
2006
2007         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2008                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2009                 return;
2010         }
2011
2012         ip = (struct ctdb_public_ip *)data.dptr;
2013
2014         update_ip_assignment_tree(rec->ctdb, ip);
2015 }
2016
2017
2018 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2019                              TDB_DATA data, void *private_data)
2020 {
2021         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2022         uint32_t timeout;
2023
2024         if (rec->ip_check_disable_ctx != NULL) {
2025                 talloc_free(rec->ip_check_disable_ctx);
2026                 rec->ip_check_disable_ctx = NULL;
2027         }
2028
2029         if (data.dsize != sizeof(uint32_t)) {
2030                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2031                                  "expexting %lu\n", (long unsigned)data.dsize,
2032                                  (long unsigned)sizeof(uint32_t)));
2033                 return;
2034         }
2035         if (data.dptr == NULL) {
2036                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2037                 return;
2038         }
2039
2040         timeout = *((uint32_t *)data.dptr);
2041         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2042
2043         rec->ip_check_disable_ctx = talloc_new(rec);
2044         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2045
2046         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2047 }
2048
2049
2050 /*
2051   handler for ip reallocate, just add it to the list of callers and 
2052   handle this later in the monitor_cluster loop so we do not recurse
2053   with other callers to takeover_run()
2054 */
2055 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2056                              TDB_DATA data, void *private_data)
2057 {
2058         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2059         struct ip_reallocate_list *caller;
2060
2061         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2062                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2063                 return;
2064         }
2065
2066         if (rec->ip_reallocate_ctx == NULL) {
2067                 rec->ip_reallocate_ctx = talloc_new(rec);
2068                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2069         }
2070
2071         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2072         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2073
2074         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2075         caller->next = rec->reallocate_callers;
2076         rec->reallocate_callers = caller;
2077
2078         return;
2079 }
2080
2081 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2082 {
2083         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2084         TDB_DATA result;
2085         int32_t ret;
2086         struct ip_reallocate_list *callers;
2087         uint32_t culprit;
2088
2089         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2090
2091         /* update the list of public ips that a node can handle for
2092            all connected nodes
2093         */
2094         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2095         if (ret != 0) {
2096                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2097                                  culprit));
2098                 rec->need_takeover_run = true;
2099         }
2100         if (ret == 0) {
2101                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2102                 if (ret != 0) {
2103                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2104                         rec->need_takeover_run = true;
2105                 }
2106         }
2107
2108         result.dsize = sizeof(int32_t);
2109         result.dptr  = (uint8_t *)&ret;
2110
2111         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2112
2113                 /* Someone that sent srvid==0 does not want a reply */
2114                 if (callers->rd->srvid == 0) {
2115                         continue;
2116                 }
2117                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2118                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2119                                   (unsigned long long)callers->rd->srvid));
2120                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2121                 if (ret != 0) {
2122                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2123                                          "message to %u:%llu\n",
2124                                          (unsigned)callers->rd->pnn,
2125                                          (unsigned long long)callers->rd->srvid));
2126                 }
2127         }
2128
2129         talloc_free(tmp_ctx);
2130         talloc_free(rec->ip_reallocate_ctx);
2131         rec->ip_reallocate_ctx = NULL;
2132         rec->reallocate_callers = NULL;
2133         
2134 }
2135
2136
2137 /*
2138   handler for recovery master elections
2139 */
2140 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2141                              TDB_DATA data, void *private_data)
2142 {
2143         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2144         int ret;
2145         struct election_message *em = (struct election_message *)data.dptr;
2146         TALLOC_CTX *mem_ctx;
2147
2148         /* we got an election packet - update the timeout for the election */
2149         talloc_free(rec->election_timeout);
2150         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2151                                                 fast_start ?
2152                                                 timeval_current_ofs(0, 500000) :
2153                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2154                                                 ctdb_election_timeout, rec);
2155
2156         mem_ctx = talloc_new(ctdb);
2157
2158         /* someone called an election. check their election data
2159            and if we disagree and we would rather be the elected node, 
2160            send a new election message to all other nodes
2161          */
2162         if (ctdb_election_win(rec, em)) {
2163                 if (!rec->send_election_te) {
2164                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2165                                                                 timeval_current_ofs(0, 500000),
2166                                                                 election_send_request, rec);
2167                 }
2168                 talloc_free(mem_ctx);
2169                 /*unban_all_nodes(ctdb);*/
2170                 return;
2171         }
2172         
2173         /* we didn't win */
2174         talloc_free(rec->send_election_te);
2175         rec->send_election_te = NULL;
2176
2177         if (ctdb->tunable.verify_recovery_lock != 0) {
2178                 /* release the recmaster lock */
2179                 if (em->pnn != ctdb->pnn &&
2180                     ctdb->recovery_lock_fd != -1) {
2181                         close(ctdb->recovery_lock_fd);
2182                         ctdb->recovery_lock_fd = -1;
2183                         unban_all_nodes(ctdb);
2184                 }
2185         }
2186
2187         /* ok, let that guy become recmaster then */
2188         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2189         if (ret != 0) {
2190                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2191                 talloc_free(mem_ctx);
2192                 return;
2193         }
2194
2195         talloc_free(mem_ctx);
2196         return;
2197 }
2198
2199
2200 /*
2201   force the start of the election process
2202  */
2203 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2204                            struct ctdb_node_map *nodemap)
2205 {
2206         int ret;
2207         struct ctdb_context *ctdb = rec->ctdb;
2208
2209         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2210
2211         /* set all nodes to recovery mode to stop all internode traffic */
2212         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2213         if (ret != 0) {
2214                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2215                 return;
2216         }
2217
2218         talloc_free(rec->election_timeout);
2219         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2220                                                 fast_start ?
2221                                                 timeval_current_ofs(0, 500000) :
2222                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2223                                                 ctdb_election_timeout, rec);
2224
2225         ret = send_election_request(rec, pnn, true);
2226         if (ret!=0) {
2227                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2228                 return;
2229         }
2230
2231         /* wait for a few seconds to collect all responses */
2232         ctdb_wait_election(rec);
2233 }
2234
2235
2236
2237 /*
2238   handler for when a node changes its flags
2239 */
2240 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2241                             TDB_DATA data, void *private_data)
2242 {
2243         int ret;
2244         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2245         struct ctdb_node_map *nodemap=NULL;
2246         TALLOC_CTX *tmp_ctx;
2247         uint32_t changed_flags;
2248         int i;
2249         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2250         int disabled_flag_changed;
2251
2252         if (data.dsize != sizeof(*c)) {
2253                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2254                 return;
2255         }
2256
2257         tmp_ctx = talloc_new(ctdb);
2258         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2259
2260         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2261         if (ret != 0) {
2262                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2263                 talloc_free(tmp_ctx);
2264                 return;         
2265         }
2266
2267
2268         for (i=0;i<nodemap->num;i++) {
2269                 if (nodemap->nodes[i].pnn == c->pnn) break;
2270         }
2271
2272         if (i == nodemap->num) {
2273                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2274                 talloc_free(tmp_ctx);
2275                 return;
2276         }
2277
2278         changed_flags = c->old_flags ^ c->new_flags;
2279
2280         if (nodemap->nodes[i].flags != c->new_flags) {
2281                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2282         }
2283
2284         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2285
2286         nodemap->nodes[i].flags = c->new_flags;
2287
2288         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2289                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2290
2291         if (ret == 0) {
2292                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2293                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2294         }
2295         
2296         if (ret == 0 &&
2297             ctdb->recovery_master == ctdb->pnn &&
2298             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2299                 /* Only do the takeover run if the perm disabled or unhealthy
2300                    flags changed since these will cause an ip failover but not
2301                    a recovery.
2302                    If the node became disconnected or banned this will also
2303                    lead to an ip address failover but that is handled 
2304                    during recovery
2305                 */
2306                 if (disabled_flag_changed) {
2307                         rec->need_takeover_run = true;
2308                 }
2309         }
2310
2311         talloc_free(tmp_ctx);
2312 }
2313
2314 /*
2315   handler for when we need to push out flag changes ot all other nodes
2316 */
2317 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2318                             TDB_DATA data, void *private_data)
2319 {
2320         int ret;
2321         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2322         struct ctdb_node_map *nodemap=NULL;
2323         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2324         uint32_t recmaster;
2325         uint32_t *nodes;
2326
2327         /* find the recovery master */
2328         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2329         if (ret != 0) {
2330                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2331                 talloc_free(tmp_ctx);
2332                 return;
2333         }
2334
2335         /* read the node flags from the recmaster */
2336         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2337         if (ret != 0) {
2338                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2339                 talloc_free(tmp_ctx);
2340                 return;
2341         }
2342         if (c->pnn >= nodemap->num) {
2343                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2344                 talloc_free(tmp_ctx);
2345                 return;
2346         }
2347
2348         /* send the flags update to all connected nodes */
2349         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2350
2351         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2352                                       nodes, 0, CONTROL_TIMEOUT(),
2353                                       false, data,
2354                                       NULL, NULL,
2355                                       NULL) != 0) {
2356                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2357
2358                 talloc_free(tmp_ctx);
2359                 return;
2360         }
2361
2362         talloc_free(tmp_ctx);
2363 }
2364
2365
2366 struct verify_recmode_normal_data {
2367         uint32_t count;
2368         enum monitor_result status;
2369 };
2370
2371 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2372 {
2373         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2374
2375
2376         /* one more node has responded with recmode data*/
2377         rmdata->count--;
2378
2379         /* if we failed to get the recmode, then return an error and let
2380            the main loop try again.
2381         */
2382         if (state->state != CTDB_CONTROL_DONE) {
2383                 if (rmdata->status == MONITOR_OK) {
2384                         rmdata->status = MONITOR_FAILED;
2385                 }
2386                 return;
2387         }
2388
2389         /* if we got a response, then the recmode will be stored in the
2390            status field
2391         */
2392         if (state->status != CTDB_RECOVERY_NORMAL) {
2393                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2394                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2395         }
2396
2397         return;
2398 }
2399
2400
2401 /* verify that all nodes are in normal recovery mode */
2402 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2403 {
2404         struct verify_recmode_normal_data *rmdata;
2405         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2406         struct ctdb_client_control_state *state;
2407         enum monitor_result status;
2408         int j;
2409         
2410         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2411         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2412         rmdata->count  = 0;
2413         rmdata->status = MONITOR_OK;
2414
2415         /* loop over all active nodes and send an async getrecmode call to 
2416            them*/
2417         for (j=0; j<nodemap->num; j++) {
2418                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2419                         continue;
2420                 }
2421                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2422                                         CONTROL_TIMEOUT(), 
2423                                         nodemap->nodes[j].pnn);
2424                 if (state == NULL) {
2425                         /* we failed to send the control, treat this as 
2426                            an error and try again next iteration
2427                         */                      
2428                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2429                         talloc_free(mem_ctx);
2430                         return MONITOR_FAILED;
2431                 }
2432
2433                 /* set up the callback functions */
2434                 state->async.fn = verify_recmode_normal_callback;
2435                 state->async.private_data = rmdata;
2436
2437                 /* one more control to wait for to complete */
2438                 rmdata->count++;
2439         }
2440
2441
2442         /* now wait for up to the maximum number of seconds allowed
2443            or until all nodes we expect a response from has replied
2444         */
2445         while (rmdata->count > 0) {
2446                 event_loop_once(ctdb->ev);
2447         }
2448
2449         status = rmdata->status;
2450         talloc_free(mem_ctx);
2451         return status;
2452 }
2453
2454
2455 struct verify_recmaster_data {
2456         struct ctdb_recoverd *rec;
2457         uint32_t count;
2458         uint32_t pnn;
2459         enum monitor_result status;
2460 };
2461
2462 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2463 {
2464         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2465
2466
2467         /* one more node has responded with recmaster data*/
2468         rmdata->count--;
2469
2470         /* if we failed to get the recmaster, then return an error and let
2471            the main loop try again.
2472         */
2473         if (state->state != CTDB_CONTROL_DONE) {
2474                 if (rmdata->status == MONITOR_OK) {
2475                         rmdata->status = MONITOR_FAILED;
2476                 }
2477                 return;
2478         }
2479
2480         /* if we got a response, then the recmaster will be stored in the
2481            status field
2482         */
2483         if (state->status != rmdata->pnn) {
2484                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2485                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2486                 rmdata->status = MONITOR_ELECTION_NEEDED;
2487         }
2488
2489         return;
2490 }
2491
2492
2493 /* verify that all nodes agree that we are the recmaster */
2494 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2495 {
2496         struct ctdb_context *ctdb = rec->ctdb;
2497         struct verify_recmaster_data *rmdata;
2498         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2499         struct ctdb_client_control_state *state;
2500         enum monitor_result status;
2501         int j;
2502         
2503         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2504         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2505         rmdata->rec    = rec;
2506         rmdata->count  = 0;
2507         rmdata->pnn    = pnn;
2508         rmdata->status = MONITOR_OK;
2509
2510         /* loop over all active nodes and send an async getrecmaster call to 
2511            them*/
2512         for (j=0; j<nodemap->num; j++) {
2513                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2514                         continue;
2515                 }
2516                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2517                                         CONTROL_TIMEOUT(),
2518                                         nodemap->nodes[j].pnn);
2519                 if (state == NULL) {
2520                         /* we failed to send the control, treat this as 
2521                            an error and try again next iteration
2522                         */                      
2523                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2524                         talloc_free(mem_ctx);
2525                         return MONITOR_FAILED;
2526                 }
2527
2528                 /* set up the callback functions */
2529                 state->async.fn = verify_recmaster_callback;
2530                 state->async.private_data = rmdata;
2531
2532                 /* one more control to wait for to complete */
2533                 rmdata->count++;
2534         }
2535
2536
2537         /* now wait for up to the maximum number of seconds allowed
2538            or until all nodes we expect a response from has replied
2539         */
2540         while (rmdata->count > 0) {
2541                 event_loop_once(ctdb->ev);
2542         }
2543
2544         status = rmdata->status;
2545         talloc_free(mem_ctx);
2546         return status;
2547 }
2548
2549
2550 /* called to check that the local allocation of public ip addresses is ok.
2551 */
2552 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2553 {
2554         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2555         struct ctdb_control_get_ifaces *ifaces = NULL;
2556         struct ctdb_all_public_ips *ips = NULL;
2557         struct ctdb_uptime *uptime1 = NULL;
2558         struct ctdb_uptime *uptime2 = NULL;
2559         int ret, j;
2560         bool need_iface_check = false;
2561         bool need_takeover_run = false;
2562
2563         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2564                                 CTDB_CURRENT_NODE, &uptime1);
2565         if (ret != 0) {
2566                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2567                 talloc_free(mem_ctx);
2568                 return -1;
2569         }
2570
2571
2572         /* read the interfaces from the local node */
2573         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2574         if (ret != 0) {
2575                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2576                 talloc_free(mem_ctx);
2577                 return -1;
2578         }
2579
2580         if (!rec->ifaces) {
2581                 need_iface_check = true;
2582         } else if (rec->ifaces->num != ifaces->num) {
2583                 need_iface_check = true;
2584         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2585                 need_iface_check = true;
2586         }
2587
2588         if (need_iface_check) {
2589                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2590                                      "local node %u - force takeover run\n",
2591                                      pnn));
2592                 need_takeover_run = true;
2593         }
2594
2595         /* read the ip allocation from the local node */
2596         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2597         if (ret != 0) {
2598                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2599                 talloc_free(mem_ctx);
2600                 return -1;
2601         }
2602
2603         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2604                                 CTDB_CURRENT_NODE, &uptime2);
2605         if (ret != 0) {
2606                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2607                 talloc_free(mem_ctx);
2608                 return -1;
2609         }
2610
2611         /* skip the check if the startrecovery time has changed */
2612         if (timeval_compare(&uptime1->last_recovery_started,
2613                             &uptime2->last_recovery_started) != 0) {
2614                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2615                 talloc_free(mem_ctx);
2616                 return 0;
2617         }
2618
2619         /* skip the check if the endrecovery time has changed */
2620         if (timeval_compare(&uptime1->last_recovery_finished,
2621                             &uptime2->last_recovery_finished) != 0) {
2622                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2623                 talloc_free(mem_ctx);
2624                 return 0;
2625         }
2626
2627         /* skip the check if we have started but not finished recovery */
2628         if (timeval_compare(&uptime1->last_recovery_finished,
2629                             &uptime1->last_recovery_started) != 1) {
2630                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2631                 talloc_free(mem_ctx);
2632
2633                 return 0;
2634         }
2635
2636         talloc_free(rec->ifaces);
2637         rec->ifaces = talloc_steal(rec, ifaces);
2638
2639         /* verify that we have the ip addresses we should have
2640            and we dont have ones we shouldnt have.
2641            if we find an inconsistency we set recmode to
2642            active on the local node and wait for the recmaster
2643            to do a full blown recovery.
2644            also if the pnn is -1 and we are healthy and can host the ip
2645            we also request a ip reallocation.
2646         */
2647         if (ctdb->tunable.disable_ip_failover == 0) {
2648                 for (j=0; j<ips->num; j++) {
2649                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2650                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2651                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2652                                 need_takeover_run = true;
2653                         } else if (ips->ips[j].pnn == pnn) {
2654                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2655                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2656                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2657                                         need_takeover_run = true;
2658                                 }
2659                         } else {
2660                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2661                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2662                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2663                                         need_takeover_run = true;
2664                                 }
2665                         }
2666                 }
2667         }
2668
2669         if (need_takeover_run) {
2670                 struct takeover_run_reply rd;
2671                 TDB_DATA data;
2672
2673                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2674
2675                 rd.pnn = ctdb->pnn;
2676                 rd.srvid = 0;
2677                 data.dptr = (uint8_t *)&rd;
2678                 data.dsize = sizeof(rd);
2679
2680                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2681                 if (ret != 0) {
2682                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2683                 }
2684         }
2685         talloc_free(mem_ctx);
2686         return 0;
2687 }
2688
2689
2690 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2691 {
2692         struct ctdb_node_map **remote_nodemaps = callback_data;
2693
2694         if (node_pnn >= ctdb->num_nodes) {
2695                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2696                 return;
2697         }
2698
2699         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2700
2701 }
2702
2703 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2704         struct ctdb_node_map *nodemap,
2705         struct ctdb_node_map **remote_nodemaps)
2706 {
2707         uint32_t *nodes;
2708
2709         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2711                                         nodes, 0,
2712                                         CONTROL_TIMEOUT(), false, tdb_null,
2713                                         async_getnodemap_callback,
2714                                         NULL,
2715                                         remote_nodemaps) != 0) {
2716                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2717
2718                 return -1;
2719         }
2720
2721         return 0;
2722 }
2723
2724 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2725 struct ctdb_check_reclock_state {
2726         struct ctdb_context *ctdb;
2727         struct timeval start_time;
2728         int fd[2];
2729         pid_t child;
2730         struct timed_event *te;
2731         struct fd_event *fde;
2732         enum reclock_child_status status;
2733 };
2734
2735 /* when we free the reclock state we must kill any child process.
2736 */
2737 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2738 {
2739         struct ctdb_context *ctdb = state->ctdb;
2740
2741         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2742
2743         if (state->fd[0] != -1) {
2744                 close(state->fd[0]);
2745                 state->fd[0] = -1;
2746         }
2747         if (state->fd[1] != -1) {
2748                 close(state->fd[1]);
2749                 state->fd[1] = -1;
2750         }
2751         kill(state->child, SIGKILL);
2752         return 0;
2753 }
2754
2755 /*
2756   called if our check_reclock child times out. this would happen if
2757   i/o to the reclock file blocks.
2758  */
2759 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2760                                          struct timeval t, void *private_data)
2761 {
2762         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2763                                            struct ctdb_check_reclock_state);
2764
2765         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2766         state->status = RECLOCK_TIMEOUT;
2767 }
2768
2769 /* this is called when the child process has completed checking the reclock
2770    file and has written data back to us through the pipe.
2771 */
2772 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2773                              uint16_t flags, void *private_data)
2774 {
2775         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2776                                              struct ctdb_check_reclock_state);
2777         char c = 0;
2778         int ret;
2779
2780         /* we got a response from our child process so we can abort the
2781            timeout.
2782         */
2783         talloc_free(state->te);
2784         state->te = NULL;
2785
2786         ret = read(state->fd[0], &c, 1);
2787         if (ret != 1 || c != RECLOCK_OK) {
2788                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2789                 state->status = RECLOCK_FAILED;
2790
2791                 return;
2792         }
2793
2794         state->status = RECLOCK_OK;
2795         return;
2796 }
2797
2798 static int check_recovery_lock(struct ctdb_context *ctdb)
2799 {
2800         int ret;
2801         struct ctdb_check_reclock_state *state;
2802         pid_t parent = getpid();
2803
2804         if (ctdb->recovery_lock_fd == -1) {
2805                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2806                 return -1;
2807         }
2808
2809         state = talloc(ctdb, struct ctdb_check_reclock_state);
2810         CTDB_NO_MEMORY(ctdb, state);
2811
2812         state->ctdb = ctdb;
2813         state->start_time = timeval_current();
2814         state->status = RECLOCK_CHECKING;
2815         state->fd[0] = -1;
2816         state->fd[1] = -1;
2817
2818         ret = pipe(state->fd);
2819         if (ret != 0) {
2820                 talloc_free(state);
2821                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2822                 return -1;
2823         }
2824
2825         state->child = ctdb_fork(ctdb);
2826         if (state->child == (pid_t)-1) {
2827                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2828                 close(state->fd[0]);
2829                 state->fd[0] = -1;
2830                 close(state->fd[1]);
2831                 state->fd[1] = -1;
2832                 talloc_free(state);
2833                 return -1;
2834         }
2835
2836         if (state->child == 0) {
2837                 char cc = RECLOCK_OK;
2838                 close(state->fd[0]);
2839                 state->fd[0] = -1;
2840
2841                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2842                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2843                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2844                         cc = RECLOCK_FAILED;
2845                 }
2846
2847                 write(state->fd[1], &cc, 1);
2848                 /* make sure we die when our parent dies */
2849                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2850                         sleep(5);
2851                         write(state->fd[1], &cc, 1);
2852                 }
2853                 _exit(0);
2854         }
2855         close(state->fd[1]);
2856         state->fd[1] = -1;
2857         set_close_on_exec(state->fd[0]);
2858
2859         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2860
2861         talloc_set_destructor(state, check_reclock_destructor);
2862
2863         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2864                                     ctdb_check_reclock_timeout, state);
2865         if (state->te == NULL) {
2866                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2867                 talloc_free(state);
2868                 return -1;
2869         }
2870
2871         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2872                                 EVENT_FD_READ,
2873                                 reclock_child_handler,
2874                                 (void *)state);
2875
2876         if (state->fde == NULL) {
2877                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2878                 talloc_free(state);
2879                 return -1;
2880         }
2881         tevent_fd_set_auto_close(state->fde);
2882
2883         while (state->status == RECLOCK_CHECKING) {
2884                 event_loop_once(ctdb->ev);
2885         }
2886
2887         if (state->status == RECLOCK_FAILED) {
2888                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2889                 close(ctdb->recovery_lock_fd);
2890                 ctdb->recovery_lock_fd = -1;
2891                 talloc_free(state);
2892                 return -1;
2893         }
2894
2895         talloc_free(state);
2896         return 0;
2897 }
2898
2899 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2900 {
2901         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2902         const char *reclockfile;
2903
2904         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2905                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2906                 talloc_free(tmp_ctx);
2907                 return -1;      
2908         }
2909
2910         if (reclockfile == NULL) {
2911                 if (ctdb->recovery_lock_file != NULL) {
2912                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2913                         talloc_free(ctdb->recovery_lock_file);
2914                         ctdb->recovery_lock_file = NULL;
2915                         if (ctdb->recovery_lock_fd != -1) {
2916                                 close(ctdb->recovery_lock_fd);
2917                                 ctdb->recovery_lock_fd = -1;
2918                         }
2919                 }
2920                 ctdb->tunable.verify_recovery_lock = 0;
2921                 talloc_free(tmp_ctx);
2922                 return 0;
2923         }
2924
2925         if (ctdb->recovery_lock_file == NULL) {
2926                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2927                 if (ctdb->recovery_lock_fd != -1) {
2928                         close(ctdb->recovery_lock_fd);
2929                         ctdb->recovery_lock_fd = -1;
2930                 }
2931                 talloc_free(tmp_ctx);
2932                 return 0;
2933         }
2934
2935
2936         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2937                 talloc_free(tmp_ctx);
2938                 return 0;
2939         }
2940
2941         talloc_free(ctdb->recovery_lock_file);
2942         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2943         ctdb->tunable.verify_recovery_lock = 0;
2944         if (ctdb->recovery_lock_fd != -1) {
2945                 close(ctdb->recovery_lock_fd);
2946                 ctdb->recovery_lock_fd = -1;
2947         }
2948
2949         talloc_free(tmp_ctx);
2950         return 0;
2951 }
2952
2953 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2954                       TALLOC_CTX *mem_ctx)
2955 {
2956         uint32_t pnn;
2957         struct ctdb_node_map *nodemap=NULL;
2958         struct ctdb_node_map *recmaster_nodemap=NULL;
2959         struct ctdb_node_map **remote_nodemaps=NULL;
2960         struct ctdb_vnn_map *vnnmap=NULL;
2961         struct ctdb_vnn_map *remote_vnnmap=NULL;
2962         int32_t debug_level;
2963         int i, j, ret;
2964
2965
2966
2967         /* verify that the main daemon is still running */
2968         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2969                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2970                 exit(-1);
2971         }
2972
2973         /* ping the local daemon to tell it we are alive */
2974         ctdb_ctrl_recd_ping(ctdb);
2975
2976         if (rec->election_timeout) {
2977                 /* an election is in progress */
2978                 return;
2979         }
2980
2981         /* read the debug level from the parent and update locally */
2982         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2983         if (ret !=0) {
2984                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2985                 return;
2986         }
2987         LogLevel = debug_level;
2988
2989
2990         /* We must check if we need to ban a node here but we want to do this
2991            as early as possible so we dont wait until we have pulled the node
2992            map from the local node. thats why we have the hardcoded value 20
2993         */
2994         for (i=0; i<ctdb->num_nodes; i++) {
2995                 struct ctdb_banning_state *ban_state;
2996
2997                 if (ctdb->nodes[i]->ban_state == NULL) {
2998                         continue;
2999                 }
3000                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3001                 if (ban_state->count < 20) {
3002                         continue;
3003                 }
3004                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3005                         ctdb->nodes[i]->pnn, ban_state->count,
3006                         ctdb->tunable.recovery_ban_period));
3007                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3008                 ban_state->count = 0;
3009         }
3010
3011         /* get relevant tunables */
3012         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3013         if (ret != 0) {
3014                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3015                 return;
3016         }
3017
3018         /* get the current recovery lock file from the server */
3019         if (update_recovery_lock_file(ctdb) != 0) {
3020                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3021                 return;
3022         }
3023
3024         /* Make sure that if recovery lock verification becomes disabled when
3025            we close the file
3026         */
3027         if (ctdb->tunable.verify_recovery_lock == 0) {
3028                 if (ctdb->recovery_lock_fd != -1) {
3029                         close(ctdb->recovery_lock_fd);
3030                         ctdb->recovery_lock_fd = -1;
3031                 }
3032         }
3033
3034         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3035         if (pnn == (uint32_t)-1) {
3036                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3037                 return;
3038         }
3039
3040         /* get the vnnmap */
3041         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3042         if (ret != 0) {
3043                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3044                 return;
3045         }
3046
3047
3048         /* get number of nodes */
3049         if (rec->nodemap) {
3050                 talloc_free(rec->nodemap);
3051                 rec->nodemap = NULL;
3052                 nodemap=NULL;
3053         }
3054         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3055         if (ret != 0) {
3056                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3057                 return;
3058         }
3059         nodemap = rec->nodemap;
3060
3061         /* check which node is the recovery master */
3062         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3063         if (ret != 0) {
3064                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3065                 return;
3066         }
3067
3068         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3069         if (rec->recmaster != pnn) {
3070                 if (rec->ip_reallocate_ctx != NULL) {
3071                         talloc_free(rec->ip_reallocate_ctx);
3072                         rec->ip_reallocate_ctx = NULL;
3073                         rec->reallocate_callers = NULL;
3074                 }
3075         }
3076         /* if there are takeovers requested, perform it and notify the waiters */
3077         if (rec->reallocate_callers) {
3078                 process_ipreallocate_requests(ctdb, rec);
3079         }
3080
3081         if (rec->recmaster == (uint32_t)-1) {
3082                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3083                 force_election(rec, pnn, nodemap);
3084                 return;
3085         }
3086
3087
3088         /* if the local daemon is STOPPED, we verify that the databases are
3089            also frozen and thet the recmode is set to active 
3090         */
3091         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3092                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3093                 if (ret != 0) {
3094                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3095                 }
3096                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3097                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3098
3099                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3100                         if (ret != 0) {
3101                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3102                                 return;
3103                         }
3104                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3105                         if (ret != 0) {
3106                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3107
3108                                 return;
3109                         }
3110                         return;
3111                 }
3112         }
3113         /* If the local node is stopped, verify we are not the recmaster 
3114            and yield this role if so
3115         */
3116         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3117                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3118                 force_election(rec, pnn, nodemap);
3119                 return;
3120         }
3121         
3122         /* check that we (recovery daemon) and the local ctdb daemon
3123            agrees on whether we are banned or not
3124         */
3125 //qqq
3126
3127         /* remember our own node flags */
3128         rec->node_flags = nodemap->nodes[pnn].flags;
3129
3130         /* count how many active nodes there are */
3131         rec->num_active    = 0;
3132         rec->num_connected = 0;
3133         for (i=0; i<nodemap->num; i++) {
3134                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3135                         rec->num_active++;
3136                 }
3137                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3138                         rec->num_connected++;
3139                 }
3140         }
3141
3142
3143         /* verify that the recmaster node is still active */
3144         for (j=0; j<nodemap->num; j++) {
3145                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3146                         break;
3147                 }
3148         }
3149
3150         if (j == nodemap->num) {
3151                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3152                 force_election(rec, pnn, nodemap);
3153                 return;
3154         }
3155
3156         /* if recovery master is disconnected we must elect a new recmaster */
3157         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3158                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3159                 force_election(rec, pnn, nodemap);
3160                 return;
3161         }
3162
3163         /* grap the nodemap from the recovery master to check if it is banned */
3164         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3165                                    mem_ctx, &recmaster_nodemap);
3166         if (ret != 0) {
3167                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3168                           nodemap->nodes[j].pnn));
3169                 return;
3170         }
3171
3172
3173         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3174                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3175                 force_election(rec, pnn, nodemap);
3176                 return;
3177         }
3178
3179
3180         /* verify that we have all ip addresses we should have and we dont
3181          * have addresses we shouldnt have.
3182          */ 
3183         if (ctdb->tunable.disable_ip_failover == 0) {
3184                 if (rec->ip_check_disable_ctx == NULL) {
3185                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3186                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3187                         }
3188                 }
3189         }
3190
3191
3192         /* if we are not the recmaster then we do not need to check
3193            if recovery is needed
3194          */
3195         if (pnn != rec->recmaster) {
3196                 return;
3197         }
3198
3199
3200         /* ensure our local copies of flags are right */
3201         ret = update_local_flags(rec, nodemap);
3202         if (ret == MONITOR_ELECTION_NEEDED) {
3203                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3204                 force_election(rec, pnn, nodemap);
3205                 return;
3206         }
3207         if (ret != MONITOR_OK) {
3208                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3209                 return;
3210         }
3211
3212         if (ctdb->num_nodes != nodemap->num) {
3213                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3214                 reload_nodes_file(ctdb);
3215                 return;
3216         }
3217
3218         /* verify that all active nodes agree that we are the recmaster */
3219         switch (verify_recmaster(rec, nodemap, pnn)) {
3220         case MONITOR_RECOVERY_NEEDED:
3221                 /* can not happen */
3222                 return;
3223         case MONITOR_ELECTION_NEEDED:
3224                 force_election(rec, pnn, nodemap);
3225                 return;
3226         case MONITOR_OK:
3227                 break;
3228         case MONITOR_FAILED:
3229                 return;
3230         }
3231
3232
3233         if (rec->need_recovery) {
3234                 /* a previous recovery didn't finish */
3235                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3236                 return;
3237         }
3238
3239         /* verify that all active nodes are in normal mode 
3240            and not in recovery mode 
3241         */
3242         switch (verify_recmode(ctdb, nodemap)) {
3243         case MONITOR_RECOVERY_NEEDED:
3244                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3245                 return;
3246         case MONITOR_FAILED:
3247                 return;
3248         case MONITOR_ELECTION_NEEDED:
3249                 /* can not happen */
3250         case MONITOR_OK:
3251                 break;
3252         }
3253
3254
3255         if (ctdb->tunable.verify_recovery_lock != 0) {
3256                 /* we should have the reclock - check its not stale */
3257                 ret = check_recovery_lock(ctdb);
3258                 if (ret != 0) {
3259                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3260                         ctdb_set_culprit(rec, ctdb->pnn);
3261                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3262                         return;
3263                 }
3264         }
3265
3266         /* get the nodemap for all active remote nodes
3267          */
3268         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3269         if (remote_nodemaps == NULL) {
3270                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3271                 return;
3272         }
3273         for(i=0; i<nodemap->num; i++) {
3274                 remote_nodemaps[i] = NULL;
3275         }
3276         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3277                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3278                 return;
3279         } 
3280
3281         /* verify that all other nodes have the same nodemap as we have
3282         */
3283         for (j=0; j<nodemap->num; j++) {
3284                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3285                         continue;
3286                 }
3287
3288                 if (remote_nodemaps[j] == NULL) {
3289                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3290                         ctdb_set_culprit(rec, j);
3291
3292                         return;
3293                 }
3294
3295                 /* if the nodes disagree on how many nodes there are
3296                    then this is a good reason to try recovery
3297                  */
3298                 if (remote_nodemaps[j]->num != nodemap->num) {
3299                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3300                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3301                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3302                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3303                         return;
3304                 }
3305
3306                 /* if the nodes disagree on which nodes exist and are
3307                    active, then that is also a good reason to do recovery
3308                  */
3309                 for (i=0;i<nodemap->num;i++) {
3310                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3311                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3312                                           nodemap->nodes[j].pnn, i, 
3313                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3314                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3315                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3316                                             vnnmap);
3317                                 return;
3318                         }
3319                 }
3320
3321                 /* verify the flags are consistent
3322                 */
3323                 for (i=0; i<nodemap->num; i++) {
3324                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3325                                 continue;
3326                         }
3327                         
3328                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3329                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3330                                   nodemap->nodes[j].pnn, 
3331                                   nodemap->nodes[i].pnn, 
3332                                   remote_nodemaps[j]->nodes[i].flags,
3333                                   nodemap->nodes[j].flags));
3334                                 if (i == j) {
3335                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3336                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3337                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3338                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3339                                                     vnnmap);
3340                                         return;
3341                                 } else {
3342                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3343                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3344                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3345                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3346                                                     vnnmap);
3347                                         return;
3348                                 }
3349                         }
3350                 }
3351         }
3352
3353
3354         /* there better be the same number of lmasters in the vnn map
3355            as there are active nodes or we will have to do a recovery
3356          */
3357         if (vnnmap->size != rec->num_active) {
3358                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3359                           vnnmap->size, rec->num_active));
3360                 ctdb_set_culprit(rec, ctdb->pnn);
3361                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3362                 return;
3363         }
3364
3365         /* verify that all active nodes in the nodemap also exist in 
3366            the vnnmap.
3367          */
3368         for (j=0; j<nodemap->num; j++) {
3369                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3370                         continue;
3371                 }
3372                 if (nodemap->nodes[j].pnn == pnn) {
3373                         continue;
3374                 }
3375
3376                 for (i=0; i<vnnmap->size; i++) {
3377                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3378                                 break;
3379                         }
3380                 }
3381                 if (i == vnnmap->size) {
3382                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3383                                   nodemap->nodes[j].pnn));
3384                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3385                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3386                         return;
3387                 }
3388         }
3389
3390         
3391         /* verify that all other nodes have the same vnnmap
3392            and are from the same generation
3393          */
3394         for (j=0; j<nodemap->num; j++) {
3395                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3396                         continue;
3397                 }
3398                 if (nodemap->nodes[j].pnn == pnn) {
3399                         continue;
3400                 }
3401
3402                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3403                                           mem_ctx, &remote_vnnmap);
3404                 if (ret != 0) {
3405                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3406                                   nodemap->nodes[j].pnn));
3407                         return;
3408                 }
3409
3410                 /* verify the vnnmap generation is the same */
3411                 if (vnnmap->generation != remote_vnnmap->generation) {
3412                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3413                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3414                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3415                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3416                         return;
3417                 }
3418
3419                 /* verify the vnnmap size is the same */
3420                 if (vnnmap->size != remote_vnnmap->size) {
3421                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3422                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3423                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3424                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3425                         return;
3426                 }
3427
3428                 /* verify the vnnmap is the same */
3429                 for (i=0;i<vnnmap->size;i++) {
3430                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3431                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3432                                           nodemap->nodes[j].pnn));
3433                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3434                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3435                                             vnnmap);
3436                                 return;
3437                         }
3438                 }
3439         }
3440
3441         /* we might need to change who has what IP assigned */
3442         if (rec->need_takeover_run) {
3443                 uint32_t culprit = (uint32_t)-1;
3444
3445                 rec->need_takeover_run = false;
3446
3447                 /* update the list of public ips that a node can handle for
3448                    all connected nodes
3449                 */
3450                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3451                 if (ret != 0) {
3452                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3453                                          culprit));
3454                         rec->need_takeover_run = true;
3455                         return;
3456                 }
3457
3458                 /* execute the "startrecovery" event script on all nodes */
3459                 ret = run_startrecovery_eventscript(rec, nodemap);
3460                 if (ret!=0) {
3461                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3462                         ctdb_set_culprit(rec, ctdb->pnn);
3463                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3464                         return;
3465                 }
3466
3467                 ret = ctdb_takeover_run(ctdb, nodemap);
3468                 if (ret != 0) {
3469                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3470                         return;
3471                 }
3472
3473                 /* execute the "recovered" event script on all nodes */
3474                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3475 #if 0
3476 // we cant check whether the event completed successfully
3477 // since this script WILL fail if the node is in recovery mode
3478 // and if that race happens, the code here would just cause a second
3479 // cascading recovery.
3480                 if (ret!=0) {
3481                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3482                         ctdb_set_culprit(rec, ctdb->pnn);
3483                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3484                 }
3485 #endif
3486         }
3487 }
3488
3489 /*
3490   the main monitoring loop
3491  */
3492 static void monitor_cluster(struct ctdb_context *ctdb)
3493 {
3494         struct ctdb_recoverd *rec;
3495
3496         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3497
3498         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3499         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3500
3501         rec->ctdb = ctdb;
3502
3503         rec->priority_time = timeval_current();
3504
3505         /* register a message port for sending memory dumps */
3506         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3507
3508         /* register a message port for recovery elections */
3509         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3510
3511         /* when nodes are disabled/enabled */
3512         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3513
3514         /* when we are asked to puch out a flag change */
3515         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3516
3517         /* register a message port for vacuum fetch */
3518         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3519
3520         /* register a message port for reloadnodes  */
3521         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3522
3523         /* register a message port for performing a takeover run */
3524         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3525
3526         /* register a message port for disabling the ip check for a short while */
3527         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3528
3529         /* register a message port for updating the recovery daemons node assignment for an ip */
3530         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3531
3532         /* register a message port for forcing a rebalance of a node next
3533            reallocation */
3534         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3535
3536         for (;;) {
3537                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3538                 struct timeval start;
3539                 double elapsed;
3540
3541                 if (!mem_ctx) {
3542                         DEBUG(DEBUG_CRIT,(__location__
3543                                           " Failed to create temp context\n"));
3544                         exit(-1);
3545                 }
3546
3547                 start = timeval_current();
3548                 main_loop(ctdb, rec, mem_ctx);
3549                 talloc_free(mem_ctx);
3550
3551                 /* we only check for recovery once every second */
3552                 elapsed = timeval_elapsed(&start);
3553                 if (elapsed < ctdb->tunable.recover_interval) {
3554                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3555                                           - elapsed);
3556                 }
3557         }
3558 }
3559
3560 /*
3561   event handler for when the main ctdbd dies
3562  */
3563 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3564                                  uint16_t flags, void *private_data)
3565 {
3566         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3567         _exit(1);
3568 }
3569
3570 /*
3571   called regularly to verify that the recovery daemon is still running
3572  */
3573 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3574                               struct timeval yt, void *p)
3575 {
3576         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3577
3578         if (kill(ctdb->recoverd_pid, 0) != 0) {
3579                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3580
3581                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3582                                 ctdb_restart_recd, ctdb);
3583
3584                 return;
3585         }
3586
3587         event_add_timed(ctdb->ev, ctdb, 
3588                         timeval_current_ofs(30, 0),
3589                         ctdb_check_recd, ctdb);
3590 }
3591
3592 static void recd_sig_child_handler(struct event_context *ev,
3593         struct signal_event *se, int signum, int count,
3594         void *dont_care, 
3595         void *private_data)
3596 {
3597 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3598         int status;
3599         pid_t pid = -1;
3600
3601         while (pid != 0) {
3602                 pid = waitpid(-1, &status, WNOHANG);
3603                 if (pid == -1) {
3604                         if (errno != ECHILD) {
3605                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3606                         }
3607                         return;
3608                 }
3609                 if (pid > 0) {
3610                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3611                 }
3612         }
3613 }
3614
3615 /*
3616   startup the recovery daemon as a child of the main ctdb daemon
3617  */
3618 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3619 {
3620         int fd[2];
3621         struct signal_event *se;
3622         struct tevent_fd *fde;
3623
3624         if (pipe(fd) != 0) {
3625                 return -1;
3626         }
3627
3628         ctdb->ctdbd_pid = getpid();
3629
3630         ctdb->recoverd_pid = fork();
3631         if (ctdb->recoverd_pid == -1) {
3632                 return -1;
3633         }
3634         
3635         if (ctdb->recoverd_pid != 0) {
3636                 close(fd[0]);
3637                 event_add_timed(ctdb->ev, ctdb, 
3638                                 timeval_current_ofs(30, 0),
3639                                 ctdb_check_recd, ctdb);
3640                 return 0;
3641         }
3642
3643         close(fd[1]);
3644
3645         srandom(getpid() ^ time(NULL));
3646
3647         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3648                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3649                 exit(1);
3650         }
3651
3652         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3653
3654         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3655                      ctdb_recoverd_parent, &fd[0]);     
3656         tevent_fd_set_auto_close(fde);
3657
3658         /* set up a handler to pick up sigchld */
3659         se = event_add_signal(ctdb->ev, ctdb,
3660                                      SIGCHLD, 0,
3661                                      recd_sig_child_handler,
3662                                      ctdb);
3663         if (se == NULL) {
3664                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3665                 exit(1);
3666         }
3667
3668         monitor_cluster(ctdb);
3669
3670         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3671         return -1;
3672 }
3673
3674 /*
3675   shutdown the recovery daemon
3676  */
3677 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3678 {
3679         if (ctdb->recoverd_pid == 0) {
3680                 return;
3681         }
3682
3683         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3684         kill(ctdb->recoverd_pid, SIGTERM);
3685 }
3686
3687 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3688                        struct timeval t, void *private_data)
3689 {
3690         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3691
3692         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3693         ctdb_stop_recoverd(ctdb);
3694         ctdb_start_recoverd(ctdb);
3695 }