recoverd: try to become the recovery master if we have the capability, but the curren...
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222
223         if (node_pnn == ctdb->pnn) {
224                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
225         }
226 }
227
228 /*
229   update the node capabilities for all connected nodes
230  */
231 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
232 {
233         uint32_t *nodes;
234         TALLOC_CTX *tmp_ctx;
235
236         tmp_ctx = talloc_new(ctdb);
237         CTDB_NO_MEMORY(ctdb, tmp_ctx);
238
239         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
240         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
241                                         nodes, 0,
242                                         CONTROL_TIMEOUT(),
243                                         false, tdb_null,
244                                         async_getcap_callback, NULL,
245                                         NULL) != 0) {
246                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
247                 talloc_free(tmp_ctx);
248                 return -1;
249         }
250
251         talloc_free(tmp_ctx);
252         return 0;
253 }
254
255 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
256 {
257         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
258
259         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
260         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
261 }
262
263 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
264 {
265         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
266
267         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
268         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
269 }
270
271 /*
272   change recovery mode on all nodes
273  */
274 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
275 {
276         TDB_DATA data;
277         uint32_t *nodes;
278         TALLOC_CTX *tmp_ctx;
279
280         tmp_ctx = talloc_new(ctdb);
281         CTDB_NO_MEMORY(ctdb, tmp_ctx);
282
283         /* freeze all nodes */
284         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
285         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
286                 int i;
287
288                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
289                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
290                                                 nodes, i,
291                                                 CONTROL_TIMEOUT(),
292                                                 false, tdb_null,
293                                                 NULL,
294                                                 set_recmode_fail_callback,
295                                                 rec) != 0) {
296                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
297                                 talloc_free(tmp_ctx);
298                                 return -1;
299                         }
300                 }
301         }
302
303
304         data.dsize = sizeof(uint32_t);
305         data.dptr = (unsigned char *)&rec_mode;
306
307         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
308                                         nodes, 0,
309                                         CONTROL_TIMEOUT(),
310                                         false, data,
311                                         NULL, NULL,
312                                         NULL) != 0) {
313                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
314                 talloc_free(tmp_ctx);
315                 return -1;
316         }
317
318         talloc_free(tmp_ctx);
319         return 0;
320 }
321
322 /*
323   change recovery master on all node
324  */
325 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
326 {
327         TDB_DATA data;
328         TALLOC_CTX *tmp_ctx;
329         uint32_t *nodes;
330
331         tmp_ctx = talloc_new(ctdb);
332         CTDB_NO_MEMORY(ctdb, tmp_ctx);
333
334         data.dsize = sizeof(uint32_t);
335         data.dptr = (unsigned char *)&pnn;
336
337         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
339                                         nodes, 0,
340                                         CONTROL_TIMEOUT(), false, data,
341                                         NULL, NULL,
342                                         NULL) != 0) {
343                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
344                 talloc_free(tmp_ctx);
345                 return -1;
346         }
347
348         talloc_free(tmp_ctx);
349         return 0;
350 }
351
352 /* update all remote nodes to use the same db priority that we have
353    this can fail if the remove node has not yet been upgraded to 
354    support this function, so we always return success and never fail
355    a recovery if this call fails.
356 */
357 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
358         struct ctdb_node_map *nodemap, 
359         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
360 {
361         int db;
362         uint32_t *nodes;
363
364         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
365
366         /* step through all local databases */
367         for (db=0; db<dbmap->num;db++) {
368                 TDB_DATA data;
369                 struct ctdb_db_priority db_prio;
370                 int ret;
371
372                 db_prio.db_id     = dbmap->dbs[db].dbid;
373                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
376                         continue;
377                 }
378
379                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
380
381                 data.dptr  = (uint8_t *)&db_prio;
382                 data.dsize = sizeof(db_prio);
383
384                 if (ctdb_client_async_control(ctdb,
385                                         CTDB_CONTROL_SET_DB_PRIORITY,
386                                         nodes, 0,
387                                         CONTROL_TIMEOUT(), false, data,
388                                         NULL, NULL,
389                                         NULL) != 0) {
390                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
391                 }
392         }
393
394         return 0;
395 }                       
396
397 /*
398   ensure all other nodes have attached to any databases that we have
399  */
400 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
401                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
402 {
403         int i, j, db, ret;
404         struct ctdb_dbid_map *remote_dbmap;
405
406         /* verify that all other nodes have all our databases */
407         for (j=0; j<nodemap->num; j++) {
408                 /* we dont need to ourself ourselves */
409                 if (nodemap->nodes[j].pnn == pnn) {
410                         continue;
411                 }
412                 /* dont check nodes that are unavailable */
413                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
414                         continue;
415                 }
416
417                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
418                                          mem_ctx, &remote_dbmap);
419                 if (ret != 0) {
420                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
421                         return -1;
422                 }
423
424                 /* step through all local databases */
425                 for (db=0; db<dbmap->num;db++) {
426                         const char *name;
427
428
429                         for (i=0;i<remote_dbmap->num;i++) {
430                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
431                                         break;
432                                 }
433                         }
434                         /* the remote node already have this database */
435                         if (i!=remote_dbmap->num) {
436                                 continue;
437                         }
438                         /* ok so we need to create this database */
439                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
440                                             mem_ctx, &name);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
443                                 return -1;
444                         }
445                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
446                                            mem_ctx, name,
447                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
448                         if (ret != 0) {
449                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
450                                 return -1;
451                         }
452                 }
453         }
454
455         return 0;
456 }
457
458
459 /*
460   ensure we are attached to any databases that anyone else is attached to
461  */
462 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
463                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
464 {
465         int i, j, db, ret;
466         struct ctdb_dbid_map *remote_dbmap;
467
468         /* verify that we have all database any other node has */
469         for (j=0; j<nodemap->num; j++) {
470                 /* we dont need to ourself ourselves */
471                 if (nodemap->nodes[j].pnn == pnn) {
472                         continue;
473                 }
474                 /* dont check nodes that are unavailable */
475                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
476                         continue;
477                 }
478
479                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
480                                          mem_ctx, &remote_dbmap);
481                 if (ret != 0) {
482                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
483                         return -1;
484                 }
485
486                 /* step through all databases on the remote node */
487                 for (db=0; db<remote_dbmap->num;db++) {
488                         const char *name;
489
490                         for (i=0;i<(*dbmap)->num;i++) {
491                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
492                                         break;
493                                 }
494                         }
495                         /* we already have this db locally */
496                         if (i!=(*dbmap)->num) {
497                                 continue;
498                         }
499                         /* ok so we need to create this database and
500                            rebuild dbmap
501                          */
502                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
503                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
506                                           nodemap->nodes[j].pnn));
507                                 return -1;
508                         }
509                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
510                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
511                         if (ret != 0) {
512                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
513                                 return -1;
514                         }
515                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
516                         if (ret != 0) {
517                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
518                                 return -1;
519                         }
520                 }
521         }
522
523         return 0;
524 }
525
526
527 /*
528   pull the remote database contents from one node into the recdb
529  */
530 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
531                                     struct tdb_wrap *recdb, uint32_t dbid,
532                                     bool persistent)
533 {
534         int ret;
535         TDB_DATA outdata;
536         struct ctdb_marshall_buffer *reply;
537         struct ctdb_rec_data *rec;
538         int i;
539         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
540
541         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
542                                CONTROL_TIMEOUT(), &outdata);
543         if (ret != 0) {
544                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
545                 talloc_free(tmp_ctx);
546                 return -1;
547         }
548
549         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
550
551         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
552                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
553                 talloc_free(tmp_ctx);
554                 return -1;
555         }
556         
557         rec = (struct ctdb_rec_data *)&reply->data[0];
558         
559         for (i=0;
560              i<reply->count;
561              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
562                 TDB_DATA key, data;
563                 struct ctdb_ltdb_header *hdr;
564                 TDB_DATA existing;
565                 
566                 key.dptr = &rec->data[0];
567                 key.dsize = rec->keylen;
568                 data.dptr = &rec->data[key.dsize];
569                 data.dsize = rec->datalen;
570                 
571                 hdr = (struct ctdb_ltdb_header *)data.dptr;
572
573                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
574                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
575                         talloc_free(tmp_ctx);
576                         return -1;
577                 }
578
579                 /* fetch the existing record, if any */
580                 existing = tdb_fetch(recdb->tdb, key);
581                 
582                 if (existing.dptr != NULL) {
583                         struct ctdb_ltdb_header header;
584                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
585                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
586                                          (unsigned)existing.dsize, srcnode));
587                                 free(existing.dptr);
588                                 talloc_free(tmp_ctx);
589                                 return -1;
590                         }
591                         header = *(struct ctdb_ltdb_header *)existing.dptr;
592                         free(existing.dptr);
593                         if (!(header.rsn < hdr->rsn ||
594                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
595                                 continue;
596                         }
597                 }
598                 
599                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
600                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
601                         talloc_free(tmp_ctx);
602                         return -1;                              
603                 }
604         }
605
606         talloc_free(tmp_ctx);
607
608         return 0;
609 }
610
611 /*
612   pull all the remote database contents into the recdb
613  */
614 static int pull_remote_database(struct ctdb_context *ctdb,
615                                 struct ctdb_recoverd *rec, 
616                                 struct ctdb_node_map *nodemap, 
617                                 struct tdb_wrap *recdb, uint32_t dbid,
618                                 bool persistent)
619 {
620         int j;
621
622         /* pull all records from all other nodes across onto this node
623            (this merges based on rsn)
624         */
625         for (j=0; j<nodemap->num; j++) {
626                 /* dont merge from nodes that are unavailable */
627                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
628                         continue;
629                 }
630                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
631                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
632                                  nodemap->nodes[j].pnn));
633                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
634                         return -1;
635                 }
636         }
637         
638         return 0;
639 }
640
641
642 /*
643   update flags on all active nodes
644  */
645 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
646 {
647         int ret;
648
649         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
650                 if (ret != 0) {
651                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
652                 return -1;
653         }
654
655         return 0;
656 }
657
658 /*
659   ensure all nodes have the same vnnmap we do
660  */
661 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
662                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
663 {
664         int j, ret;
665
666         /* push the new vnn map out to all the nodes */
667         for (j=0; j<nodemap->num; j++) {
668                 /* dont push to nodes that are unavailable */
669                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
670                         continue;
671                 }
672
673                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
674                 if (ret != 0) {
675                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
676                         return -1;
677                 }
678         }
679
680         return 0;
681 }
682
683
684 struct vacuum_info {
685         struct vacuum_info *next, *prev;
686         struct ctdb_recoverd *rec;
687         uint32_t srcnode;
688         struct ctdb_db_context *ctdb_db;
689         struct ctdb_marshall_buffer *recs;
690         struct ctdb_rec_data *r;
691 };
692
693 static void vacuum_fetch_next(struct vacuum_info *v);
694
695 /*
696   called when a vacuum fetch has completed - just free it and do the next one
697  */
698 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
699 {
700         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
701         talloc_free(state);
702         vacuum_fetch_next(v);
703 }
704
705
706 /*
707   process the next element from the vacuum list
708 */
709 static void vacuum_fetch_next(struct vacuum_info *v)
710 {
711         struct ctdb_call call;
712         struct ctdb_rec_data *r;
713
714         while (v->recs->count) {
715                 struct ctdb_client_call_state *state;
716                 TDB_DATA data;
717                 struct ctdb_ltdb_header *hdr;
718
719                 ZERO_STRUCT(call);
720                 call.call_id = CTDB_NULL_FUNC;
721                 call.flags = CTDB_IMMEDIATE_MIGRATION;
722                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
723
724                 r = v->r;
725                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
726                 v->recs->count--;
727
728                 call.key.dptr = &r->data[0];
729                 call.key.dsize = r->keylen;
730
731                 /* ensure we don't block this daemon - just skip a record if we can't get
732                    the chainlock */
733                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
734                         continue;
735                 }
736
737                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
738                 if (data.dptr == NULL) {
739                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
740                         continue;
741                 }
742
743                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
744                         free(data.dptr);
745                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
746                         continue;
747                 }
748                 
749                 hdr = (struct ctdb_ltdb_header *)data.dptr;
750                 if (hdr->dmaster == v->rec->ctdb->pnn) {
751                         /* its already local */
752                         free(data.dptr);
753                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                         continue;
755                 }
756
757                 free(data.dptr);
758
759                 state = ctdb_call_send(v->ctdb_db, &call);
760                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
761                 if (state == NULL) {
762                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
763                         talloc_free(v);
764                         return;
765                 }
766                 state->async.fn = vacuum_fetch_callback;
767                 state->async.private_data = v;
768                 return;
769         }
770
771         talloc_free(v);
772 }
773
774
775 /*
776   destroy a vacuum info structure
777  */
778 static int vacuum_info_destructor(struct vacuum_info *v)
779 {
780         DLIST_REMOVE(v->rec->vacuum_info, v);
781         return 0;
782 }
783
784
785 /*
786   handler for vacuum fetch
787 */
788 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
789                                  TDB_DATA data, void *private_data)
790 {
791         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
792         struct ctdb_marshall_buffer *recs;
793         int ret, i;
794         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
795         const char *name;
796         struct ctdb_dbid_map *dbmap=NULL;
797         bool persistent = false;
798         struct ctdb_db_context *ctdb_db;
799         struct ctdb_rec_data *r;
800         uint32_t srcnode;
801         struct vacuum_info *v;
802
803         recs = (struct ctdb_marshall_buffer *)data.dptr;
804         r = (struct ctdb_rec_data *)&recs->data[0];
805
806         if (recs->count == 0) {
807                 talloc_free(tmp_ctx);
808                 return;
809         }
810
811         srcnode = r->reqid;
812
813         for (v=rec->vacuum_info;v;v=v->next) {
814                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
815                         /* we're already working on records from this node */
816                         talloc_free(tmp_ctx);
817                         return;
818                 }
819         }
820
821         /* work out if the database is persistent */
822         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
823         if (ret != 0) {
824                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
825                 talloc_free(tmp_ctx);
826                 return;
827         }
828
829         for (i=0;i<dbmap->num;i++) {
830                 if (dbmap->dbs[i].dbid == recs->db_id) {
831                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
832                         break;
833                 }
834         }
835         if (i == dbmap->num) {
836                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;         
839         }
840
841         /* find the name of this database */
842         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
843                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
844                 talloc_free(tmp_ctx);
845                 return;
846         }
847
848         /* attach to it */
849         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
850         if (ctdb_db == NULL) {
851                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v = talloc_zero(rec, struct vacuum_info);
857         if (v == NULL) {
858                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
859                 talloc_free(tmp_ctx);
860                 return;
861         }
862
863         v->rec = rec;
864         v->srcnode = srcnode;
865         v->ctdb_db = ctdb_db;
866         v->recs = talloc_memdup(v, recs, data.dsize);
867         if (v->recs == NULL) {
868                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
869                 talloc_free(v);
870                 talloc_free(tmp_ctx);
871                 return;         
872         }
873         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
874
875         DLIST_ADD(rec->vacuum_info, v);
876
877         talloc_set_destructor(v, vacuum_info_destructor);
878
879         vacuum_fetch_next(v);
880         talloc_free(tmp_ctx);
881 }
882
883
884 /*
885   called when ctdb_wait_timeout should finish
886  */
887 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
888                               struct timeval yt, void *p)
889 {
890         uint32_t *timed_out = (uint32_t *)p;
891         (*timed_out) = 1;
892 }
893
894 /*
895   wait for a given number of seconds
896  */
897 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
898 {
899         uint32_t timed_out = 0;
900         time_t usecs = (secs - (time_t)secs) * 1000000;
901         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
902         while (!timed_out) {
903                 event_loop_once(ctdb->ev);
904         }
905 }
906
907 /*
908   called when an election times out (ends)
909  */
910 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
911                                   struct timeval t, void *p)
912 {
913         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
914         rec->election_timeout = NULL;
915         fast_start = false;
916
917         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
918 }
919
920
921 /*
922   wait for an election to finish. It finished election_timeout seconds after
923   the last election packet is received
924  */
925 static void ctdb_wait_election(struct ctdb_recoverd *rec)
926 {
927         struct ctdb_context *ctdb = rec->ctdb;
928         while (rec->election_timeout) {
929                 event_loop_once(ctdb->ev);
930         }
931 }
932
933 /*
934   Update our local flags from all remote connected nodes. 
935   This is only run when we are or we belive we are the recovery master
936  */
937 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
938 {
939         int j;
940         struct ctdb_context *ctdb = rec->ctdb;
941         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
942
943         /* get the nodemap for all active remote nodes and verify
944            they are the same as for this node
945          */
946         for (j=0; j<nodemap->num; j++) {
947                 struct ctdb_node_map *remote_nodemap=NULL;
948                 int ret;
949
950                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
951                         continue;
952                 }
953                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
954                         continue;
955                 }
956
957                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
958                                            mem_ctx, &remote_nodemap);
959                 if (ret != 0) {
960                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
961                                   nodemap->nodes[j].pnn));
962                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
963                         talloc_free(mem_ctx);
964                         return MONITOR_FAILED;
965                 }
966                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
967                         /* We should tell our daemon about this so it
968                            updates its flags or else we will log the same 
969                            message again in the next iteration of recovery.
970                            Since we are the recovery master we can just as
971                            well update the flags on all nodes.
972                         */
973                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
974                         if (ret != 0) {
975                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
976                                 return -1;
977                         }
978
979                         /* Update our local copy of the flags in the recovery
980                            daemon.
981                         */
982                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
983                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
984                                  nodemap->nodes[j].flags));
985                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
986                 }
987                 talloc_free(remote_nodemap);
988         }
989         talloc_free(mem_ctx);
990         return MONITOR_OK;
991 }
992
993
994 /* Create a new random generation ip. 
995    The generation id can not be the INVALID_GENERATION id
996 */
997 static uint32_t new_generation(void)
998 {
999         uint32_t generation;
1000
1001         while (1) {
1002                 generation = random();
1003
1004                 if (generation != INVALID_GENERATION) {
1005                         break;
1006                 }
1007         }
1008
1009         return generation;
1010 }
1011
1012
1013 /*
1014   create a temporary working database
1015  */
1016 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1017 {
1018         char *name;
1019         struct tdb_wrap *recdb;
1020         unsigned tdb_flags;
1021
1022         /* open up the temporary recovery database */
1023         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1024                                ctdb->db_directory_state,
1025                                ctdb->pnn);
1026         if (name == NULL) {
1027                 return NULL;
1028         }
1029         unlink(name);
1030
1031         tdb_flags = TDB_NOLOCK;
1032         if (ctdb->valgrinding) {
1033                 tdb_flags |= TDB_NOMMAP;
1034         }
1035         tdb_flags |= TDB_DISALLOW_NESTING;
1036
1037         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1038                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1039         if (recdb == NULL) {
1040                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1041         }
1042
1043         talloc_free(name);
1044
1045         return recdb;
1046 }
1047
1048
1049 /* 
1050    a traverse function for pulling all relevent records from recdb
1051  */
1052 struct recdb_data {
1053         struct ctdb_context *ctdb;
1054         struct ctdb_marshall_buffer *recdata;
1055         uint32_t len;
1056         bool failed;
1057         bool persistent;
1058 };
1059
1060 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1061 {
1062         struct recdb_data *params = (struct recdb_data *)p;
1063         struct ctdb_rec_data *rec;
1064         struct ctdb_ltdb_header *hdr;
1065
1066         /* skip empty records */
1067         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1068                 return 0;
1069         }
1070
1071         /* update the dmaster field to point to us */
1072         hdr = (struct ctdb_ltdb_header *)data.dptr;
1073         if (!params->persistent) {
1074                 hdr->dmaster = params->ctdb->pnn;
1075                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1076         }
1077
1078         /* add the record to the blob ready to send to the nodes */
1079         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1080         if (rec == NULL) {
1081                 params->failed = true;
1082                 return -1;
1083         }
1084         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1085         if (params->recdata == NULL) {
1086                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1087                          rec->length + params->len, params->recdata->count));
1088                 params->failed = true;
1089                 return -1;
1090         }
1091         params->recdata->count++;
1092         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1093         params->len += rec->length;
1094         talloc_free(rec);
1095
1096         return 0;
1097 }
1098
1099 /*
1100   push the recdb database out to all nodes
1101  */
1102 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1103                                bool persistent,
1104                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1105 {
1106         struct recdb_data params;
1107         struct ctdb_marshall_buffer *recdata;
1108         TDB_DATA outdata;
1109         TALLOC_CTX *tmp_ctx;
1110         uint32_t *nodes;
1111
1112         tmp_ctx = talloc_new(ctdb);
1113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1114
1115         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1116         CTDB_NO_MEMORY(ctdb, recdata);
1117
1118         recdata->db_id = dbid;
1119
1120         params.ctdb = ctdb;
1121         params.recdata = recdata;
1122         params.len = offsetof(struct ctdb_marshall_buffer, data);
1123         params.failed = false;
1124         params.persistent = persistent;
1125
1126         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1127                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1128                 talloc_free(params.recdata);
1129                 talloc_free(tmp_ctx);
1130                 return -1;
1131         }
1132
1133         if (params.failed) {
1134                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1135                 talloc_free(params.recdata);
1136                 talloc_free(tmp_ctx);
1137                 return -1;              
1138         }
1139
1140         recdata = params.recdata;
1141
1142         outdata.dptr = (void *)recdata;
1143         outdata.dsize = params.len;
1144
1145         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1146         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1147                                         nodes, 0,
1148                                         CONTROL_TIMEOUT(), false, outdata,
1149                                         NULL, NULL,
1150                                         NULL) != 0) {
1151                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1152                 talloc_free(recdata);
1153                 talloc_free(tmp_ctx);
1154                 return -1;
1155         }
1156
1157         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1158                   dbid, recdata->count));
1159
1160         talloc_free(recdata);
1161         talloc_free(tmp_ctx);
1162
1163         return 0;
1164 }
1165
1166
1167 /*
1168   go through a full recovery on one database 
1169  */
1170 static int recover_database(struct ctdb_recoverd *rec, 
1171                             TALLOC_CTX *mem_ctx,
1172                             uint32_t dbid,
1173                             bool persistent,
1174                             uint32_t pnn, 
1175                             struct ctdb_node_map *nodemap,
1176                             uint32_t transaction_id)
1177 {
1178         struct tdb_wrap *recdb;
1179         int ret;
1180         struct ctdb_context *ctdb = rec->ctdb;
1181         TDB_DATA data;
1182         struct ctdb_control_wipe_database w;
1183         uint32_t *nodes;
1184
1185         recdb = create_recdb(ctdb, mem_ctx);
1186         if (recdb == NULL) {
1187                 return -1;
1188         }
1189
1190         /* pull all remote databases onto the recdb */
1191         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1192         if (ret != 0) {
1193                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1194                 return -1;
1195         }
1196
1197         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1198
1199         /* wipe all the remote databases. This is safe as we are in a transaction */
1200         w.db_id = dbid;
1201         w.transaction_id = transaction_id;
1202
1203         data.dptr = (void *)&w;
1204         data.dsize = sizeof(w);
1205
1206         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1207         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1208                                         nodes, 0,
1209                                         CONTROL_TIMEOUT(), false, data,
1210                                         NULL, NULL,
1211                                         NULL) != 0) {
1212                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216         
1217         /* push out the correct database. This sets the dmaster and skips 
1218            the empty records */
1219         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1220         if (ret != 0) {
1221                 talloc_free(recdb);
1222                 return -1;
1223         }
1224
1225         /* all done with this database */
1226         talloc_free(recdb);
1227
1228         return 0;
1229 }
1230
1231 /*
1232   reload the nodes file 
1233 */
1234 static void reload_nodes_file(struct ctdb_context *ctdb)
1235 {
1236         ctdb->nodes = NULL;
1237         ctdb_load_nodes_file(ctdb);
1238 }
1239
1240 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1241                                          struct ctdb_recoverd *rec,
1242                                          struct ctdb_node_map *nodemap,
1243                                          uint32_t *culprit)
1244 {
1245         int j;
1246         int ret;
1247
1248         if (ctdb->num_nodes != nodemap->num) {
1249                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1250                                   ctdb->num_nodes, nodemap->num));
1251                 if (culprit) {
1252                         *culprit = ctdb->pnn;
1253                 }
1254                 return -1;
1255         }
1256
1257         for (j=0; j<nodemap->num; j++) {
1258                 /* release any existing data */
1259                 if (ctdb->nodes[j]->known_public_ips) {
1260                         talloc_free(ctdb->nodes[j]->known_public_ips);
1261                         ctdb->nodes[j]->known_public_ips = NULL;
1262                 }
1263                 if (ctdb->nodes[j]->available_public_ips) {
1264                         talloc_free(ctdb->nodes[j]->available_public_ips);
1265                         ctdb->nodes[j]->available_public_ips = NULL;
1266                 }
1267
1268                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1269                         continue;
1270                 }
1271
1272                 /* grab a new shiny list of public ips from the node */
1273                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1274                                         CONTROL_TIMEOUT(),
1275                                         ctdb->nodes[j]->pnn,
1276                                         ctdb->nodes,
1277                                         0,
1278                                         &ctdb->nodes[j]->known_public_ips);
1279                 if (ret != 0) {
1280                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1281                                 ctdb->nodes[j]->pnn));
1282                         if (culprit) {
1283                                 *culprit = ctdb->nodes[j]->pnn;
1284                         }
1285                         return -1;
1286                 }
1287
1288                 if (ctdb->tunable.disable_ip_failover == 0) {
1289                         if (rec->ip_check_disable_ctx == NULL) {
1290                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1291                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1292                                         rec->need_takeover_run = true;
1293                                 }
1294                         }
1295                 }
1296
1297                 /* grab a new shiny list of public ips from the node */
1298                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1299                                         CONTROL_TIMEOUT(),
1300                                         ctdb->nodes[j]->pnn,
1301                                         ctdb->nodes,
1302                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1303                                         &ctdb->nodes[j]->available_public_ips);
1304                 if (ret != 0) {
1305                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1306                                 ctdb->nodes[j]->pnn));
1307                         if (culprit) {
1308                                 *culprit = ctdb->nodes[j]->pnn;
1309                         }
1310                         return -1;
1311                 }
1312         }
1313
1314         return 0;
1315 }
1316
1317 /* when we start a recovery, make sure all nodes use the same reclock file
1318    setting
1319 */
1320 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1321 {
1322         struct ctdb_context *ctdb = rec->ctdb;
1323         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1324         TDB_DATA data;
1325         uint32_t *nodes;
1326
1327         if (ctdb->recovery_lock_file == NULL) {
1328                 data.dptr  = NULL;
1329                 data.dsize = 0;
1330         } else {
1331                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1332                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1333         }
1334
1335         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1336         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1337                                         nodes, 0,
1338                                         CONTROL_TIMEOUT(),
1339                                         false, data,
1340                                         NULL, NULL,
1341                                         rec) != 0) {
1342                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1343                 talloc_free(tmp_ctx);
1344                 return -1;
1345         }
1346
1347         talloc_free(tmp_ctx);
1348         return 0;
1349 }
1350
1351
1352 /*
1353   we are the recmaster, and recovery is needed - start a recovery run
1354  */
1355 static int do_recovery(struct ctdb_recoverd *rec, 
1356                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1357                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1358 {
1359         struct ctdb_context *ctdb = rec->ctdb;
1360         int i, j, ret;
1361         uint32_t generation;
1362         struct ctdb_dbid_map *dbmap;
1363         TDB_DATA data;
1364         uint32_t *nodes;
1365         struct timeval start_time;
1366         uint32_t culprit = (uint32_t)-1;
1367
1368         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1369
1370         /* if recovery fails, force it again */
1371         rec->need_recovery = true;
1372
1373         for (i=0; i<ctdb->num_nodes; i++) {
1374                 struct ctdb_banning_state *ban_state;
1375
1376                 if (ctdb->nodes[i]->ban_state == NULL) {
1377                         continue;
1378                 }
1379                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1380                 if (ban_state->count < 2*ctdb->num_nodes) {
1381                         continue;
1382                 }
1383                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1384                         ctdb->nodes[i]->pnn, ban_state->count,
1385                         ctdb->tunable.recovery_ban_period));
1386                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1387                 ban_state->count = 0;
1388         }
1389
1390
1391         if (ctdb->tunable.verify_recovery_lock != 0) {
1392                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1393                 start_time = timeval_current();
1394                 if (!ctdb_recovery_lock(ctdb, true)) {
1395                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1396                                          "and ban ourself for %u seconds\n",
1397                                          ctdb->tunable.recovery_ban_period));
1398                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1399                         return -1;
1400                 }
1401                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1402                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1403         }
1404
1405         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1406
1407         /* get a list of all databases */
1408         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1409         if (ret != 0) {
1410                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1411                 return -1;
1412         }
1413
1414         /* we do the db creation before we set the recovery mode, so the freeze happens
1415            on all databases we will be dealing with. */
1416
1417         /* verify that we have all the databases any other node has */
1418         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1419         if (ret != 0) {
1420                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1421                 return -1;
1422         }
1423
1424         /* verify that all other nodes have all our databases */
1425         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1426         if (ret != 0) {
1427                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1428                 return -1;
1429         }
1430         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1431
1432         /* update the database priority for all remote databases */
1433         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1434         if (ret != 0) {
1435                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1436         }
1437         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1438
1439
1440         /* update all other nodes to use the same setting for reclock files
1441            as the local recovery master.
1442         */
1443         sync_recovery_lock_file_across_cluster(rec);
1444
1445         /* set recovery mode to active on all nodes */
1446         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1447         if (ret != 0) {
1448                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1449                 return -1;
1450         }
1451
1452         /* execute the "startrecovery" event script on all nodes */
1453         ret = run_startrecovery_eventscript(rec, nodemap);
1454         if (ret!=0) {
1455                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1456                 return -1;
1457         }
1458
1459         /*
1460           update all nodes to have the same flags that we have
1461          */
1462         for (i=0;i<nodemap->num;i++) {
1463                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1464                         continue;
1465                 }
1466
1467                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1468                 if (ret != 0) {
1469                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1470                         return -1;
1471                 }
1472         }
1473
1474         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1475
1476         /* pick a new generation number */
1477         generation = new_generation();
1478
1479         /* change the vnnmap on this node to use the new generation 
1480            number but not on any other nodes.
1481            this guarantees that if we abort the recovery prematurely
1482            for some reason (a node stops responding?)
1483            that we can just return immediately and we will reenter
1484            recovery shortly again.
1485            I.e. we deliberately leave the cluster with an inconsistent
1486            generation id to allow us to abort recovery at any stage and
1487            just restart it from scratch.
1488          */
1489         vnnmap->generation = generation;
1490         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1491         if (ret != 0) {
1492                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1493                 return -1;
1494         }
1495
1496         data.dptr = (void *)&generation;
1497         data.dsize = sizeof(uint32_t);
1498
1499         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1500         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1501                                         nodes, 0,
1502                                         CONTROL_TIMEOUT(), false, data,
1503                                         NULL,
1504                                         transaction_start_fail_callback,
1505                                         rec) != 0) {
1506                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1507                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1508                                         nodes, 0,
1509                                         CONTROL_TIMEOUT(), false, tdb_null,
1510                                         NULL,
1511                                         NULL,
1512                                         NULL) != 0) {
1513                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1514                 }
1515                 return -1;
1516         }
1517
1518         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1519
1520         for (i=0;i<dbmap->num;i++) {
1521                 ret = recover_database(rec, mem_ctx,
1522                                        dbmap->dbs[i].dbid,
1523                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1524                                        pnn, nodemap, generation);
1525                 if (ret != 0) {
1526                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1527                         return -1;
1528                 }
1529         }
1530
1531         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1532
1533         /* commit all the changes */
1534         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1535                                         nodes, 0,
1536                                         CONTROL_TIMEOUT(), false, data,
1537                                         NULL, NULL,
1538                                         NULL) != 0) {
1539                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1540                 return -1;
1541         }
1542
1543         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1544         
1545
1546         /* update the capabilities for all nodes */
1547         ret = update_capabilities(ctdb, nodemap);
1548         if (ret!=0) {
1549                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1550                 return -1;
1551         }
1552
1553         /* build a new vnn map with all the currently active and
1554            unbanned nodes */
1555         generation = new_generation();
1556         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1557         CTDB_NO_MEMORY(ctdb, vnnmap);
1558         vnnmap->generation = generation;
1559         vnnmap->size = 0;
1560         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1561         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1562         for (i=j=0;i<nodemap->num;i++) {
1563                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1564                         continue;
1565                 }
1566                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1567                         /* this node can not be an lmaster */
1568                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1569                         continue;
1570                 }
1571
1572                 vnnmap->size++;
1573                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1574                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1575                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1576
1577         }
1578         if (vnnmap->size == 0) {
1579                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1580                 vnnmap->size++;
1581                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1582                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1583                 vnnmap->map[0] = pnn;
1584         }       
1585
1586         /* update to the new vnnmap on all nodes */
1587         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1588         if (ret != 0) {
1589                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1590                 return -1;
1591         }
1592
1593         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1594
1595         /* update recmaster to point to us for all nodes */
1596         ret = set_recovery_master(ctdb, nodemap, pnn);
1597         if (ret!=0) {
1598                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1599                 return -1;
1600         }
1601
1602         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1603
1604         /*
1605           update all nodes to have the same flags that we have
1606          */
1607         for (i=0;i<nodemap->num;i++) {
1608                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1609                         continue;
1610                 }
1611
1612                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1613                 if (ret != 0) {
1614                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1615                         return -1;
1616                 }
1617         }
1618
1619         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1620
1621         /* disable recovery mode */
1622         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1623         if (ret != 0) {
1624                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1625                 return -1;
1626         }
1627
1628         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1629
1630         /*
1631           tell nodes to takeover their public IPs
1632          */
1633         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1634         if (ret != 0) {
1635                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1636                                  culprit));
1637                 rec->need_takeover_run = true;
1638                 return -1;
1639         }
1640         rec->need_takeover_run = false;
1641         ret = ctdb_takeover_run(ctdb, nodemap);
1642         if (ret != 0) {
1643                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1644                 rec->need_takeover_run = true;
1645         }
1646
1647         /* execute the "recovered" event script on all nodes */
1648         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1649         if (ret!=0) {
1650                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1651                 return -1;
1652         }
1653
1654         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1655
1656         /* send a message to all clients telling them that the cluster 
1657            has been reconfigured */
1658         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1659
1660         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1661
1662         rec->need_recovery = false;
1663
1664         /* we managed to complete a full recovery, make sure to forgive
1665            any past sins by the nodes that could now participate in the
1666            recovery.
1667         */
1668         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1669         for (i=0;i<nodemap->num;i++) {
1670                 struct ctdb_banning_state *ban_state;
1671
1672                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1673                         continue;
1674                 }
1675
1676                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1677                 if (ban_state == NULL) {
1678                         continue;
1679                 }
1680
1681                 ban_state->count = 0;
1682         }
1683
1684
1685         /* We just finished a recovery successfully. 
1686            We now wait for rerecovery_timeout before we allow 
1687            another recovery to take place.
1688         */
1689         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1690         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1691         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1692
1693         return 0;
1694 }
1695
1696
1697 /*
1698   elections are won by first checking the number of connected nodes, then
1699   the priority time, then the pnn
1700  */
1701 struct election_message {
1702         uint32_t num_connected;
1703         struct timeval priority_time;
1704         uint32_t pnn;
1705         uint32_t node_flags;
1706 };
1707
1708 /*
1709   form this nodes election data
1710  */
1711 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1712 {
1713         int ret, i;
1714         struct ctdb_node_map *nodemap;
1715         struct ctdb_context *ctdb = rec->ctdb;
1716
1717         ZERO_STRUCTP(em);
1718
1719         em->pnn = rec->ctdb->pnn;
1720         em->priority_time = rec->priority_time;
1721
1722         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1723         if (ret != 0) {
1724                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1725                 return;
1726         }
1727
1728         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1729         em->node_flags = rec->node_flags;
1730
1731         for (i=0;i<nodemap->num;i++) {
1732                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1733                         em->num_connected++;
1734                 }
1735         }
1736
1737         /* we shouldnt try to win this election if we cant be a recmaster */
1738         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1739                 em->num_connected = 0;
1740                 em->priority_time = timeval_current();
1741         }
1742
1743         talloc_free(nodemap);
1744 }
1745
1746 /*
1747   see if the given election data wins
1748  */
1749 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1750 {
1751         struct election_message myem;
1752         int cmp = 0;
1753
1754         ctdb_election_data(rec, &myem);
1755
1756         /* we cant win if we dont have the recmaster capability */
1757         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1758                 return false;
1759         }
1760
1761         /* we cant win if we are banned */
1762         if (rec->node_flags & NODE_FLAGS_BANNED) {
1763                 return false;
1764         }       
1765
1766         /* we cant win if we are stopped */
1767         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1768                 return false;
1769         }       
1770
1771         /* we will automatically win if the other node is banned */
1772         if (em->node_flags & NODE_FLAGS_BANNED) {
1773                 return true;
1774         }
1775
1776         /* we will automatically win if the other node is banned */
1777         if (em->node_flags & NODE_FLAGS_STOPPED) {
1778                 return true;
1779         }
1780
1781         /* try to use the most connected node */
1782         if (cmp == 0) {
1783                 cmp = (int)myem.num_connected - (int)em->num_connected;
1784         }
1785
1786         /* then the longest running node */
1787         if (cmp == 0) {
1788                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1789         }
1790
1791         if (cmp == 0) {
1792                 cmp = (int)myem.pnn - (int)em->pnn;
1793         }
1794
1795         return cmp > 0;
1796 }
1797
1798 /*
1799   send out an election request
1800  */
1801 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1802 {
1803         int ret;
1804         TDB_DATA election_data;
1805         struct election_message emsg;
1806         uint64_t srvid;
1807         struct ctdb_context *ctdb = rec->ctdb;
1808
1809         srvid = CTDB_SRVID_RECOVERY;
1810
1811         ctdb_election_data(rec, &emsg);
1812
1813         election_data.dsize = sizeof(struct election_message);
1814         election_data.dptr  = (unsigned char *)&emsg;
1815
1816
1817         /* send an election message to all active nodes */
1818         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1819         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1820
1821
1822         /* A new node that is already frozen has entered the cluster.
1823            The existing nodes are not frozen and dont need to be frozen
1824            until the election has ended and we start the actual recovery
1825         */
1826         if (update_recmaster == true) {
1827                 /* first we assume we will win the election and set 
1828                    recoverymaster to be ourself on the current node
1829                  */
1830                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1831                 if (ret != 0) {
1832                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1833                         return -1;
1834                 }
1835         }
1836
1837
1838         return 0;
1839 }
1840
1841 /*
1842   this function will unban all nodes in the cluster
1843 */
1844 static void unban_all_nodes(struct ctdb_context *ctdb)
1845 {
1846         int ret, i;
1847         struct ctdb_node_map *nodemap;
1848         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1849         
1850         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1851         if (ret != 0) {
1852                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1853                 return;
1854         }
1855
1856         for (i=0;i<nodemap->num;i++) {
1857                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1858                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1859                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1860                 }
1861         }
1862
1863         talloc_free(tmp_ctx);
1864 }
1865
1866
1867 /*
1868   we think we are winning the election - send a broadcast election request
1869  */
1870 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1871 {
1872         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1873         int ret;
1874
1875         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1876         if (ret != 0) {
1877                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1878         }
1879
1880         talloc_free(rec->send_election_te);
1881         rec->send_election_te = NULL;
1882 }
1883
1884 /*
1885   handler for memory dumps
1886 */
1887 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1888                              TDB_DATA data, void *private_data)
1889 {
1890         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1891         TDB_DATA *dump;
1892         int ret;
1893         struct rd_memdump_reply *rd;
1894
1895         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1897                 talloc_free(tmp_ctx);
1898                 return;
1899         }
1900         rd = (struct rd_memdump_reply *)data.dptr;
1901
1902         dump = talloc_zero(tmp_ctx, TDB_DATA);
1903         if (dump == NULL) {
1904                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1905                 talloc_free(tmp_ctx);
1906                 return;
1907         }
1908         ret = ctdb_dump_memory(ctdb, dump);
1909         if (ret != 0) {
1910                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1911                 talloc_free(tmp_ctx);
1912                 return;
1913         }
1914
1915 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1916
1917         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1918         if (ret != 0) {
1919                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1920                 talloc_free(tmp_ctx);
1921                 return;
1922         }
1923
1924         talloc_free(tmp_ctx);
1925 }
1926
1927 /*
1928   handler for reload_nodes
1929 */
1930 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1931                              TDB_DATA data, void *private_data)
1932 {
1933         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1934
1935         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1936
1937         reload_nodes_file(rec->ctdb);
1938 }
1939
1940
1941 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1942                               struct timeval yt, void *p)
1943 {
1944         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1945
1946         talloc_free(rec->ip_check_disable_ctx);
1947         rec->ip_check_disable_ctx = NULL;
1948 }
1949
1950
1951 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1952                              TDB_DATA data, void *private_data)
1953 {
1954         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1955         struct ctdb_public_ip *ip;
1956
1957         if (rec->recmaster != rec->ctdb->pnn) {
1958                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1959                 return;
1960         }
1961
1962         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1963                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1964                 return;
1965         }
1966
1967         ip = (struct ctdb_public_ip *)data.dptr;
1968
1969         update_ip_assignment_tree(rec->ctdb, ip);
1970 }
1971
1972
1973 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1974                              TDB_DATA data, void *private_data)
1975 {
1976         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1977         uint32_t timeout;
1978
1979         if (rec->ip_check_disable_ctx != NULL) {
1980                 talloc_free(rec->ip_check_disable_ctx);
1981                 rec->ip_check_disable_ctx = NULL;
1982         }
1983
1984         if (data.dsize != sizeof(uint32_t)) {
1985                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1986                                  "expexting %lu\n", (long unsigned)data.dsize,
1987                                  (long unsigned)sizeof(uint32_t)));
1988                 return;
1989         }
1990         if (data.dptr == NULL) {
1991                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1992                 return;
1993         }
1994
1995         timeout = *((uint32_t *)data.dptr);
1996         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1997
1998         rec->ip_check_disable_ctx = talloc_new(rec);
1999         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2000
2001         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2002 }
2003
2004
2005 /*
2006   handler for ip reallocate, just add it to the list of callers and 
2007   handle this later in the monitor_cluster loop so we do not recurse
2008   with other callers to takeover_run()
2009 */
2010 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2011                              TDB_DATA data, void *private_data)
2012 {
2013         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2014         struct ip_reallocate_list *caller;
2015
2016         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2017                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2018                 return;
2019         }
2020
2021         if (rec->ip_reallocate_ctx == NULL) {
2022                 rec->ip_reallocate_ctx = talloc_new(rec);
2023                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2024         }
2025
2026         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2027         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2028
2029         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2030         caller->next = rec->reallocate_callers;
2031         rec->reallocate_callers = caller;
2032
2033         return;
2034 }
2035
2036 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2037 {
2038         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2039         TDB_DATA result;
2040         int32_t ret;
2041         struct ip_reallocate_list *callers;
2042         uint32_t culprit;
2043
2044         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2045
2046         /* update the list of public ips that a node can handle for
2047            all connected nodes
2048         */
2049         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2050         if (ret != 0) {
2051                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2052                                  culprit));
2053                 rec->need_takeover_run = true;
2054         }
2055         if (ret == 0) {
2056                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2057                 if (ret != 0) {
2058                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2059                         rec->need_takeover_run = true;
2060                 }
2061         }
2062
2063         result.dsize = sizeof(int32_t);
2064         result.dptr  = (uint8_t *)&ret;
2065
2066         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2067
2068                 /* Someone that sent srvid==0 does not want a reply */
2069                 if (callers->rd->srvid == 0) {
2070                         continue;
2071                 }
2072                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2073                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2074                                   (unsigned long long)callers->rd->srvid));
2075                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2076                 if (ret != 0) {
2077                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2078                                          "message to %u:%llu\n",
2079                                          (unsigned)callers->rd->pnn,
2080                                          (unsigned long long)callers->rd->srvid));
2081                 }
2082         }
2083
2084         talloc_free(tmp_ctx);
2085         talloc_free(rec->ip_reallocate_ctx);
2086         rec->ip_reallocate_ctx = NULL;
2087         rec->reallocate_callers = NULL;
2088         
2089 }
2090
2091
2092 /*
2093   handler for recovery master elections
2094 */
2095 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2096                              TDB_DATA data, void *private_data)
2097 {
2098         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2099         int ret;
2100         struct election_message *em = (struct election_message *)data.dptr;
2101         TALLOC_CTX *mem_ctx;
2102
2103         /* we got an election packet - update the timeout for the election */
2104         talloc_free(rec->election_timeout);
2105         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2106                                                 fast_start ?
2107                                                 timeval_current_ofs(0, 500000) :
2108                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2109                                                 ctdb_election_timeout, rec);
2110
2111         mem_ctx = talloc_new(ctdb);
2112
2113         /* someone called an election. check their election data
2114            and if we disagree and we would rather be the elected node, 
2115            send a new election message to all other nodes
2116          */
2117         if (ctdb_election_win(rec, em)) {
2118                 if (!rec->send_election_te) {
2119                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2120                                                                 timeval_current_ofs(0, 500000),
2121                                                                 election_send_request, rec);
2122                 }
2123                 talloc_free(mem_ctx);
2124                 /*unban_all_nodes(ctdb);*/
2125                 return;
2126         }
2127         
2128         /* we didn't win */
2129         talloc_free(rec->send_election_te);
2130         rec->send_election_te = NULL;
2131
2132         if (ctdb->tunable.verify_recovery_lock != 0) {
2133                 /* release the recmaster lock */
2134                 if (em->pnn != ctdb->pnn &&
2135                     ctdb->recovery_lock_fd != -1) {
2136                         close(ctdb->recovery_lock_fd);
2137                         ctdb->recovery_lock_fd = -1;
2138                         unban_all_nodes(ctdb);
2139                 }
2140         }
2141
2142         /* ok, let that guy become recmaster then */
2143         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2144         if (ret != 0) {
2145                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2146                 talloc_free(mem_ctx);
2147                 return;
2148         }
2149
2150         talloc_free(mem_ctx);
2151         return;
2152 }
2153
2154
2155 /*
2156   force the start of the election process
2157  */
2158 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2159                            struct ctdb_node_map *nodemap)
2160 {
2161         int ret;
2162         struct ctdb_context *ctdb = rec->ctdb;
2163
2164         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2165
2166         /* set all nodes to recovery mode to stop all internode traffic */
2167         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2168         if (ret != 0) {
2169                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2170                 return;
2171         }
2172
2173         talloc_free(rec->election_timeout);
2174         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2175                                                 fast_start ?
2176                                                 timeval_current_ofs(0, 500000) :
2177                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2178                                                 ctdb_election_timeout, rec);
2179
2180         ret = send_election_request(rec, pnn, true);
2181         if (ret!=0) {
2182                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2183                 return;
2184         }
2185
2186         /* wait for a few seconds to collect all responses */
2187         ctdb_wait_election(rec);
2188 }
2189
2190
2191
2192 /*
2193   handler for when a node changes its flags
2194 */
2195 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2196                             TDB_DATA data, void *private_data)
2197 {
2198         int ret;
2199         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2200         struct ctdb_node_map *nodemap=NULL;
2201         TALLOC_CTX *tmp_ctx;
2202         int i;
2203         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2204         int disabled_flag_changed;
2205
2206         if (data.dsize != sizeof(*c)) {
2207                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2208                 return;
2209         }
2210
2211         tmp_ctx = talloc_new(ctdb);
2212         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2213
2214         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2215         if (ret != 0) {
2216                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2217                 talloc_free(tmp_ctx);
2218                 return;         
2219         }
2220
2221
2222         for (i=0;i<nodemap->num;i++) {
2223                 if (nodemap->nodes[i].pnn == c->pnn) break;
2224         }
2225
2226         if (i == nodemap->num) {
2227                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2228                 talloc_free(tmp_ctx);
2229                 return;
2230         }
2231
2232         if (nodemap->nodes[i].flags != c->new_flags) {
2233                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2234         }
2235
2236         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2237
2238         nodemap->nodes[i].flags = c->new_flags;
2239
2240         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2241                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2242
2243         if (ret == 0) {
2244                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2245                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2246         }
2247         
2248         if (ret == 0 &&
2249             ctdb->recovery_master == ctdb->pnn &&
2250             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2251                 /* Only do the takeover run if the perm disabled or unhealthy
2252                    flags changed since these will cause an ip failover but not
2253                    a recovery.
2254                    If the node became disconnected or banned this will also
2255                    lead to an ip address failover but that is handled 
2256                    during recovery
2257                 */
2258                 if (disabled_flag_changed) {
2259                         rec->need_takeover_run = true;
2260                 }
2261         }
2262
2263         talloc_free(tmp_ctx);
2264 }
2265
2266 /*
2267   handler for when we need to push out flag changes ot all other nodes
2268 */
2269 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2270                             TDB_DATA data, void *private_data)
2271 {
2272         int ret;
2273         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2274         struct ctdb_node_map *nodemap=NULL;
2275         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2276         uint32_t recmaster;
2277         uint32_t *nodes;
2278
2279         /* find the recovery master */
2280         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2281         if (ret != 0) {
2282                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2283                 talloc_free(tmp_ctx);
2284                 return;
2285         }
2286
2287         /* read the node flags from the recmaster */
2288         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2289         if (ret != 0) {
2290                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2291                 talloc_free(tmp_ctx);
2292                 return;
2293         }
2294         if (c->pnn >= nodemap->num) {
2295                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2296                 talloc_free(tmp_ctx);
2297                 return;
2298         }
2299
2300         /* send the flags update to all connected nodes */
2301         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2302
2303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2304                                       nodes, 0, CONTROL_TIMEOUT(),
2305                                       false, data,
2306                                       NULL, NULL,
2307                                       NULL) != 0) {
2308                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2309
2310                 talloc_free(tmp_ctx);
2311                 return;
2312         }
2313
2314         talloc_free(tmp_ctx);
2315 }
2316
2317
2318 struct verify_recmode_normal_data {
2319         uint32_t count;
2320         enum monitor_result status;
2321 };
2322
2323 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2324 {
2325         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2326
2327
2328         /* one more node has responded with recmode data*/
2329         rmdata->count--;
2330
2331         /* if we failed to get the recmode, then return an error and let
2332            the main loop try again.
2333         */
2334         if (state->state != CTDB_CONTROL_DONE) {
2335                 if (rmdata->status == MONITOR_OK) {
2336                         rmdata->status = MONITOR_FAILED;
2337                 }
2338                 return;
2339         }
2340
2341         /* if we got a response, then the recmode will be stored in the
2342            status field
2343         */
2344         if (state->status != CTDB_RECOVERY_NORMAL) {
2345                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2346                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2347         }
2348
2349         return;
2350 }
2351
2352
2353 /* verify that all nodes are in normal recovery mode */
2354 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2355 {
2356         struct verify_recmode_normal_data *rmdata;
2357         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2358         struct ctdb_client_control_state *state;
2359         enum monitor_result status;
2360         int j;
2361         
2362         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2363         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2364         rmdata->count  = 0;
2365         rmdata->status = MONITOR_OK;
2366
2367         /* loop over all active nodes and send an async getrecmode call to 
2368            them*/
2369         for (j=0; j<nodemap->num; j++) {
2370                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2371                         continue;
2372                 }
2373                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2374                                         CONTROL_TIMEOUT(), 
2375                                         nodemap->nodes[j].pnn);
2376                 if (state == NULL) {
2377                         /* we failed to send the control, treat this as 
2378                            an error and try again next iteration
2379                         */                      
2380                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2381                         talloc_free(mem_ctx);
2382                         return MONITOR_FAILED;
2383                 }
2384
2385                 /* set up the callback functions */
2386                 state->async.fn = verify_recmode_normal_callback;
2387                 state->async.private_data = rmdata;
2388
2389                 /* one more control to wait for to complete */
2390                 rmdata->count++;
2391         }
2392
2393
2394         /* now wait for up to the maximum number of seconds allowed
2395            or until all nodes we expect a response from has replied
2396         */
2397         while (rmdata->count > 0) {
2398                 event_loop_once(ctdb->ev);
2399         }
2400
2401         status = rmdata->status;
2402         talloc_free(mem_ctx);
2403         return status;
2404 }
2405
2406
2407 struct verify_recmaster_data {
2408         struct ctdb_recoverd *rec;
2409         uint32_t count;
2410         uint32_t pnn;
2411         enum monitor_result status;
2412 };
2413
2414 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2415 {
2416         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2417
2418
2419         /* one more node has responded with recmaster data*/
2420         rmdata->count--;
2421
2422         /* if we failed to get the recmaster, then return an error and let
2423            the main loop try again.
2424         */
2425         if (state->state != CTDB_CONTROL_DONE) {
2426                 if (rmdata->status == MONITOR_OK) {
2427                         rmdata->status = MONITOR_FAILED;
2428                 }
2429                 return;
2430         }
2431
2432         /* if we got a response, then the recmaster will be stored in the
2433            status field
2434         */
2435         if (state->status != rmdata->pnn) {
2436                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2437                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2438                 rmdata->status = MONITOR_ELECTION_NEEDED;
2439         }
2440
2441         return;
2442 }
2443
2444
2445 /* verify that all nodes agree that we are the recmaster */
2446 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2447 {
2448         struct ctdb_context *ctdb = rec->ctdb;
2449         struct verify_recmaster_data *rmdata;
2450         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2451         struct ctdb_client_control_state *state;
2452         enum monitor_result status;
2453         int j;
2454         
2455         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2456         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2457         rmdata->rec    = rec;
2458         rmdata->count  = 0;
2459         rmdata->pnn    = pnn;
2460         rmdata->status = MONITOR_OK;
2461
2462         /* loop over all active nodes and send an async getrecmaster call to 
2463            them*/
2464         for (j=0; j<nodemap->num; j++) {
2465                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2466                         continue;
2467                 }
2468                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2469                                         CONTROL_TIMEOUT(),
2470                                         nodemap->nodes[j].pnn);
2471                 if (state == NULL) {
2472                         /* we failed to send the control, treat this as 
2473                            an error and try again next iteration
2474                         */                      
2475                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2476                         talloc_free(mem_ctx);
2477                         return MONITOR_FAILED;
2478                 }
2479
2480                 /* set up the callback functions */
2481                 state->async.fn = verify_recmaster_callback;
2482                 state->async.private_data = rmdata;
2483
2484                 /* one more control to wait for to complete */
2485                 rmdata->count++;
2486         }
2487
2488
2489         /* now wait for up to the maximum number of seconds allowed
2490            or until all nodes we expect a response from has replied
2491         */
2492         while (rmdata->count > 0) {
2493                 event_loop_once(ctdb->ev);
2494         }
2495
2496         status = rmdata->status;
2497         talloc_free(mem_ctx);
2498         return status;
2499 }
2500
2501
2502 /* called to check that the local allocation of public ip addresses is ok.
2503 */
2504 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2505 {
2506         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2507         struct ctdb_control_get_ifaces *ifaces = NULL;
2508         struct ctdb_all_public_ips *ips = NULL;
2509         struct ctdb_uptime *uptime1 = NULL;
2510         struct ctdb_uptime *uptime2 = NULL;
2511         int ret, j;
2512         bool need_iface_check = false;
2513         bool need_takeover_run = false;
2514
2515         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2516                                 CTDB_CURRENT_NODE, &uptime1);
2517         if (ret != 0) {
2518                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2519                 talloc_free(mem_ctx);
2520                 return -1;
2521         }
2522
2523
2524         /* read the interfaces from the local node */
2525         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2526         if (ret != 0) {
2527                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2528                 talloc_free(mem_ctx);
2529                 return -1;
2530         }
2531
2532         if (!rec->ifaces) {
2533                 need_iface_check = true;
2534         } else if (rec->ifaces->num != ifaces->num) {
2535                 need_iface_check = true;
2536         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2537                 need_iface_check = true;
2538         }
2539
2540         if (need_iface_check) {
2541                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2542                                      "local node %u - force takeover run\n",
2543                                      pnn));
2544                 need_takeover_run = true;
2545         }
2546
2547         /* read the ip allocation from the local node */
2548         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2549         if (ret != 0) {
2550                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2551                 talloc_free(mem_ctx);
2552                 return -1;
2553         }
2554
2555         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2556                                 CTDB_CURRENT_NODE, &uptime2);
2557         if (ret != 0) {
2558                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2559                 talloc_free(mem_ctx);
2560                 return -1;
2561         }
2562
2563         /* skip the check if the startrecovery time has changed */
2564         if (timeval_compare(&uptime1->last_recovery_started,
2565                             &uptime2->last_recovery_started) != 0) {
2566                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2567                 talloc_free(mem_ctx);
2568                 return 0;
2569         }
2570
2571         /* skip the check if the endrecovery time has changed */
2572         if (timeval_compare(&uptime1->last_recovery_finished,
2573                             &uptime2->last_recovery_finished) != 0) {
2574                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2575                 talloc_free(mem_ctx);
2576                 return 0;
2577         }
2578
2579         /* skip the check if we have started but not finished recovery */
2580         if (timeval_compare(&uptime1->last_recovery_finished,
2581                             &uptime1->last_recovery_started) != 1) {
2582                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2583                 talloc_free(mem_ctx);
2584
2585                 return 0;
2586         }
2587
2588         talloc_free(rec->ifaces);
2589         rec->ifaces = talloc_steal(rec, ifaces);
2590
2591         /* verify that we have the ip addresses we should have
2592            and we dont have ones we shouldnt have.
2593            if we find an inconsistency we set recmode to
2594            active on the local node and wait for the recmaster
2595            to do a full blown recovery.
2596            also if the pnn is -1 and we are healthy and can host the ip
2597            we also request a ip reallocation.
2598         */
2599         if (ctdb->tunable.disable_ip_failover == 0) {
2600                 for (j=0; j<ips->num; j++) {
2601                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2602                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2603                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2604                                 need_takeover_run = true;
2605                         } else if (ips->ips[j].pnn == pnn) {
2606                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2607                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2608                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2609                                         need_takeover_run = true;
2610                                 }
2611                         } else {
2612                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2613                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2614                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2615                                         need_takeover_run = true;
2616                                 }
2617                         }
2618                 }
2619         }
2620
2621         if (need_takeover_run) {
2622                 struct takeover_run_reply rd;
2623                 TDB_DATA data;
2624
2625                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2626
2627                 rd.pnn = ctdb->pnn;
2628                 rd.srvid = 0;
2629                 data.dptr = (uint8_t *)&rd;
2630                 data.dsize = sizeof(rd);
2631
2632                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2633                 if (ret != 0) {
2634                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2635                 }
2636         }
2637         talloc_free(mem_ctx);
2638         return 0;
2639 }
2640
2641
2642 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2643 {
2644         struct ctdb_node_map **remote_nodemaps = callback_data;
2645
2646         if (node_pnn >= ctdb->num_nodes) {
2647                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2648                 return;
2649         }
2650
2651         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2652
2653 }
2654
2655 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2656         struct ctdb_node_map *nodemap,
2657         struct ctdb_node_map **remote_nodemaps)
2658 {
2659         uint32_t *nodes;
2660
2661         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2662         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2663                                         nodes, 0,
2664                                         CONTROL_TIMEOUT(), false, tdb_null,
2665                                         async_getnodemap_callback,
2666                                         NULL,
2667                                         remote_nodemaps) != 0) {
2668                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2669
2670                 return -1;
2671         }
2672
2673         return 0;
2674 }
2675
2676 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2677 struct ctdb_check_reclock_state {
2678         struct ctdb_context *ctdb;
2679         struct timeval start_time;
2680         int fd[2];
2681         pid_t child;
2682         struct timed_event *te;
2683         struct fd_event *fde;
2684         enum reclock_child_status status;
2685 };
2686
2687 /* when we free the reclock state we must kill any child process.
2688 */
2689 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2690 {
2691         struct ctdb_context *ctdb = state->ctdb;
2692
2693         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2694
2695         if (state->fd[0] != -1) {
2696                 close(state->fd[0]);
2697                 state->fd[0] = -1;
2698         }
2699         if (state->fd[1] != -1) {
2700                 close(state->fd[1]);
2701                 state->fd[1] = -1;
2702         }
2703         kill(state->child, SIGKILL);
2704         return 0;
2705 }
2706
2707 /*
2708   called if our check_reclock child times out. this would happen if
2709   i/o to the reclock file blocks.
2710  */
2711 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2712                                          struct timeval t, void *private_data)
2713 {
2714         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2715                                            struct ctdb_check_reclock_state);
2716
2717         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2718         state->status = RECLOCK_TIMEOUT;
2719 }
2720
2721 /* this is called when the child process has completed checking the reclock
2722    file and has written data back to us through the pipe.
2723 */
2724 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2725                              uint16_t flags, void *private_data)
2726 {
2727         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2728                                              struct ctdb_check_reclock_state);
2729         char c = 0;
2730         int ret;
2731
2732         /* we got a response from our child process so we can abort the
2733            timeout.
2734         */
2735         talloc_free(state->te);
2736         state->te = NULL;
2737
2738         ret = read(state->fd[0], &c, 1);
2739         if (ret != 1 || c != RECLOCK_OK) {
2740                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2741                 state->status = RECLOCK_FAILED;
2742
2743                 return;
2744         }
2745
2746         state->status = RECLOCK_OK;
2747         return;
2748 }
2749
2750 static int check_recovery_lock(struct ctdb_context *ctdb)
2751 {
2752         int ret;
2753         struct ctdb_check_reclock_state *state;
2754         pid_t parent = getpid();
2755
2756         if (ctdb->recovery_lock_fd == -1) {
2757                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2758                 return -1;
2759         }
2760
2761         state = talloc(ctdb, struct ctdb_check_reclock_state);
2762         CTDB_NO_MEMORY(ctdb, state);
2763
2764         state->ctdb = ctdb;
2765         state->start_time = timeval_current();
2766         state->status = RECLOCK_CHECKING;
2767         state->fd[0] = -1;
2768         state->fd[1] = -1;
2769
2770         ret = pipe(state->fd);
2771         if (ret != 0) {
2772                 talloc_free(state);
2773                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2774                 return -1;
2775         }
2776
2777         state->child = ctdb_fork(ctdb);
2778         if (state->child == (pid_t)-1) {
2779                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2780                 close(state->fd[0]);
2781                 state->fd[0] = -1;
2782                 close(state->fd[1]);
2783                 state->fd[1] = -1;
2784                 talloc_free(state);
2785                 return -1;
2786         }
2787
2788         if (state->child == 0) {
2789                 char cc = RECLOCK_OK;
2790                 close(state->fd[0]);
2791                 state->fd[0] = -1;
2792
2793                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2794                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2795                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2796                         cc = RECLOCK_FAILED;
2797                 }
2798
2799                 write(state->fd[1], &cc, 1);
2800                 /* make sure we die when our parent dies */
2801                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2802                         sleep(5);
2803                         write(state->fd[1], &cc, 1);
2804                 }
2805                 _exit(0);
2806         }
2807         close(state->fd[1]);
2808         state->fd[1] = -1;
2809         set_close_on_exec(state->fd[0]);
2810
2811         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2812
2813         talloc_set_destructor(state, check_reclock_destructor);
2814
2815         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2816                                     ctdb_check_reclock_timeout, state);
2817         if (state->te == NULL) {
2818                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2819                 talloc_free(state);
2820                 return -1;
2821         }
2822
2823         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2824                                 EVENT_FD_READ,
2825                                 reclock_child_handler,
2826                                 (void *)state);
2827
2828         if (state->fde == NULL) {
2829                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2830                 talloc_free(state);
2831                 return -1;
2832         }
2833         tevent_fd_set_auto_close(state->fde);
2834
2835         while (state->status == RECLOCK_CHECKING) {
2836                 event_loop_once(ctdb->ev);
2837         }
2838
2839         if (state->status == RECLOCK_FAILED) {
2840                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2841                 close(ctdb->recovery_lock_fd);
2842                 ctdb->recovery_lock_fd = -1;
2843                 talloc_free(state);
2844                 return -1;
2845         }
2846
2847         talloc_free(state);
2848         return 0;
2849 }
2850
2851 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2852 {
2853         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2854         const char *reclockfile;
2855
2856         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2857                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2858                 talloc_free(tmp_ctx);
2859                 return -1;      
2860         }
2861
2862         if (reclockfile == NULL) {
2863                 if (ctdb->recovery_lock_file != NULL) {
2864                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2865                         talloc_free(ctdb->recovery_lock_file);
2866                         ctdb->recovery_lock_file = NULL;
2867                         if (ctdb->recovery_lock_fd != -1) {
2868                                 close(ctdb->recovery_lock_fd);
2869                                 ctdb->recovery_lock_fd = -1;
2870                         }
2871                 }
2872                 ctdb->tunable.verify_recovery_lock = 0;
2873                 talloc_free(tmp_ctx);
2874                 return 0;
2875         }
2876
2877         if (ctdb->recovery_lock_file == NULL) {
2878                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2879                 if (ctdb->recovery_lock_fd != -1) {
2880                         close(ctdb->recovery_lock_fd);
2881                         ctdb->recovery_lock_fd = -1;
2882                 }
2883                 talloc_free(tmp_ctx);
2884                 return 0;
2885         }
2886
2887
2888         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2889                 talloc_free(tmp_ctx);
2890                 return 0;
2891         }
2892
2893         talloc_free(ctdb->recovery_lock_file);
2894         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2895         ctdb->tunable.verify_recovery_lock = 0;
2896         if (ctdb->recovery_lock_fd != -1) {
2897                 close(ctdb->recovery_lock_fd);
2898                 ctdb->recovery_lock_fd = -1;
2899         }
2900
2901         talloc_free(tmp_ctx);
2902         return 0;
2903 }
2904
2905 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2906                       TALLOC_CTX *mem_ctx)
2907 {
2908         uint32_t pnn;
2909         struct ctdb_node_map *nodemap=NULL;
2910         struct ctdb_node_map *recmaster_nodemap=NULL;
2911         struct ctdb_node_map **remote_nodemaps=NULL;
2912         struct ctdb_vnn_map *vnnmap=NULL;
2913         struct ctdb_vnn_map *remote_vnnmap=NULL;
2914         int32_t debug_level;
2915         int i, j, ret;
2916
2917
2918
2919         /* verify that the main daemon is still running */
2920         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2921                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2922                 exit(-1);
2923         }
2924
2925         /* ping the local daemon to tell it we are alive */
2926         ctdb_ctrl_recd_ping(ctdb);
2927
2928         if (rec->election_timeout) {
2929                 /* an election is in progress */
2930                 return;
2931         }
2932
2933         /* read the debug level from the parent and update locally */
2934         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2935         if (ret !=0) {
2936                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2937                 return;
2938         }
2939         LogLevel = debug_level;
2940
2941
2942         /* We must check if we need to ban a node here but we want to do this
2943            as early as possible so we dont wait until we have pulled the node
2944            map from the local node. thats why we have the hardcoded value 20
2945         */
2946         for (i=0; i<ctdb->num_nodes; i++) {
2947                 struct ctdb_banning_state *ban_state;
2948
2949                 if (ctdb->nodes[i]->ban_state == NULL) {
2950                         continue;
2951                 }
2952                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2953                 if (ban_state->count < 20) {
2954                         continue;
2955                 }
2956                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2957                         ctdb->nodes[i]->pnn, ban_state->count,
2958                         ctdb->tunable.recovery_ban_period));
2959                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2960                 ban_state->count = 0;
2961         }
2962
2963         /* get relevant tunables */
2964         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2965         if (ret != 0) {
2966                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2967                 return;
2968         }
2969
2970         /* get the current recovery lock file from the server */
2971         if (update_recovery_lock_file(ctdb) != 0) {
2972                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2973                 return;
2974         }
2975
2976         /* Make sure that if recovery lock verification becomes disabled when
2977            we close the file
2978         */
2979         if (ctdb->tunable.verify_recovery_lock == 0) {
2980                 if (ctdb->recovery_lock_fd != -1) {
2981                         close(ctdb->recovery_lock_fd);
2982                         ctdb->recovery_lock_fd = -1;
2983                 }
2984         }
2985
2986         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2987         if (pnn == (uint32_t)-1) {
2988                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2989                 return;
2990         }
2991
2992         /* get the vnnmap */
2993         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2994         if (ret != 0) {
2995                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2996                 return;
2997         }
2998
2999
3000         /* get number of nodes */
3001         if (rec->nodemap) {
3002                 talloc_free(rec->nodemap);
3003                 rec->nodemap = NULL;
3004                 nodemap=NULL;
3005         }
3006         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3007         if (ret != 0) {
3008                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3009                 return;
3010         }
3011         nodemap = rec->nodemap;
3012
3013         /* update the capabilities for all nodes */
3014         ret = update_capabilities(ctdb, nodemap);
3015         if (ret != 0) {
3016                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3017                 return;
3018         }
3019
3020         /* check which node is the recovery master */
3021         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3022         if (ret != 0) {
3023                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3024                 return;
3025         }
3026
3027         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3028         if (rec->recmaster != pnn) {
3029                 if (rec->ip_reallocate_ctx != NULL) {
3030                         talloc_free(rec->ip_reallocate_ctx);
3031                         rec->ip_reallocate_ctx = NULL;
3032                         rec->reallocate_callers = NULL;
3033                 }
3034         }
3035
3036         if (rec->recmaster == (uint32_t)-1) {
3037                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3038                 force_election(rec, pnn, nodemap);
3039                 return;
3040         }
3041
3042         /* if the local daemon is STOPPED, we verify that the databases are
3043            also frozen and thet the recmode is set to active 
3044         */
3045         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3046                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3047                 if (ret != 0) {
3048                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3049                 }
3050                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3051                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3052
3053                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3054                         if (ret != 0) {
3055                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3056                                 return;
3057                         }
3058                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3059                         if (ret != 0) {
3060                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3061
3062                                 return;
3063                         }
3064                         return;
3065                 }
3066         }
3067         /* If the local node is stopped, verify we are not the recmaster 
3068            and yield this role if so
3069         */
3070         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3071                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3072                 force_election(rec, pnn, nodemap);
3073                 return;
3074         }
3075         
3076         /*
3077          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3078          * but we have force an election and try to become the new
3079          * recmaster
3080          */
3081         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3082             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3083              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3084                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3085                                   " but we (node %u) have - force an election\n",
3086                                   rec->recmaster, pnn));
3087                 force_election(rec, pnn, nodemap);
3088                 return;
3089         }
3090
3091         /* check that we (recovery daemon) and the local ctdb daemon
3092            agrees on whether we are banned or not
3093         */
3094 //qqq
3095
3096         /* remember our own node flags */
3097         rec->node_flags = nodemap->nodes[pnn].flags;
3098
3099         /* count how many active nodes there are */
3100         rec->num_active    = 0;
3101         rec->num_connected = 0;
3102         for (i=0; i<nodemap->num; i++) {
3103                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3104                         rec->num_active++;
3105                 }
3106                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3107                         rec->num_connected++;
3108                 }
3109         }
3110
3111
3112         /* verify that the recmaster node is still active */
3113         for (j=0; j<nodemap->num; j++) {
3114                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3115                         break;
3116                 }
3117         }
3118
3119         if (j == nodemap->num) {
3120                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3121                 force_election(rec, pnn, nodemap);
3122                 return;
3123         }
3124
3125         /* if recovery master is disconnected we must elect a new recmaster */
3126         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3127                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3128                 force_election(rec, pnn, nodemap);
3129                 return;
3130         }
3131
3132         /* grap the nodemap from the recovery master to check if it is banned */
3133         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3134                                    mem_ctx, &recmaster_nodemap);
3135         if (ret != 0) {
3136                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3137                           nodemap->nodes[j].pnn));
3138                 return;
3139         }
3140
3141
3142         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3143                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3144                 force_election(rec, pnn, nodemap);
3145                 return;
3146         }
3147
3148
3149         /* verify that we have all ip addresses we should have and we dont
3150          * have addresses we shouldnt have.
3151          */ 
3152         if (ctdb->tunable.disable_ip_failover == 0) {
3153                 if (rec->ip_check_disable_ctx == NULL) {
3154                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3155                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3156                         }
3157                 }
3158         }
3159
3160
3161         /* if we are not the recmaster then we do not need to check
3162            if recovery is needed
3163          */
3164         if (pnn != rec->recmaster) {
3165                 return;
3166         }
3167
3168
3169         /* ensure our local copies of flags are right */
3170         ret = update_local_flags(rec, nodemap);
3171         if (ret == MONITOR_ELECTION_NEEDED) {
3172                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3173                 force_election(rec, pnn, nodemap);
3174                 return;
3175         }
3176         if (ret != MONITOR_OK) {
3177                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3178                 return;
3179         }
3180
3181         if (ctdb->num_nodes != nodemap->num) {
3182                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3183                 reload_nodes_file(ctdb);
3184                 return;
3185         }
3186
3187         /* verify that all active nodes agree that we are the recmaster */
3188         switch (verify_recmaster(rec, nodemap, pnn)) {
3189         case MONITOR_RECOVERY_NEEDED:
3190                 /* can not happen */
3191                 return;
3192         case MONITOR_ELECTION_NEEDED:
3193                 force_election(rec, pnn, nodemap);
3194                 return;
3195         case MONITOR_OK:
3196                 break;
3197         case MONITOR_FAILED:
3198                 return;
3199         }
3200
3201
3202         if (rec->need_recovery) {
3203                 /* a previous recovery didn't finish */
3204                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3205                 return;
3206         }
3207
3208         /* verify that all active nodes are in normal mode 
3209            and not in recovery mode 
3210         */
3211         switch (verify_recmode(ctdb, nodemap)) {
3212         case MONITOR_RECOVERY_NEEDED:
3213                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3214                 return;
3215         case MONITOR_FAILED:
3216                 return;
3217         case MONITOR_ELECTION_NEEDED:
3218                 /* can not happen */
3219         case MONITOR_OK:
3220                 break;
3221         }
3222
3223
3224         if (ctdb->tunable.verify_recovery_lock != 0) {
3225                 /* we should have the reclock - check its not stale */
3226                 ret = check_recovery_lock(ctdb);
3227                 if (ret != 0) {
3228                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3229                         ctdb_set_culprit(rec, ctdb->pnn);
3230                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3231                         return;
3232                 }
3233         }
3234
3235         /* if there are takeovers requested, perform it and notify the waiters */
3236         if (rec->reallocate_callers) {
3237                 process_ipreallocate_requests(ctdb, rec);
3238         }
3239
3240         /* get the nodemap for all active remote nodes
3241          */
3242         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3243         if (remote_nodemaps == NULL) {
3244                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3245                 return;
3246         }
3247         for(i=0; i<nodemap->num; i++) {
3248                 remote_nodemaps[i] = NULL;
3249         }
3250         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3251                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3252                 return;
3253         } 
3254
3255         /* verify that all other nodes have the same nodemap as we have
3256         */
3257         for (j=0; j<nodemap->num; j++) {
3258                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3259                         continue;
3260                 }
3261
3262                 if (remote_nodemaps[j] == NULL) {
3263                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3264                         ctdb_set_culprit(rec, j);
3265
3266                         return;
3267                 }
3268
3269                 /* if the nodes disagree on how many nodes there are
3270                    then this is a good reason to try recovery
3271                  */
3272                 if (remote_nodemaps[j]->num != nodemap->num) {
3273                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3274                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3275                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3276                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3277                         return;
3278                 }
3279
3280                 /* if the nodes disagree on which nodes exist and are
3281                    active, then that is also a good reason to do recovery
3282                  */
3283                 for (i=0;i<nodemap->num;i++) {
3284                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3285                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3286                                           nodemap->nodes[j].pnn, i, 
3287                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3288                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3289                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3290                                             vnnmap);
3291                                 return;
3292                         }
3293                 }
3294
3295                 /* verify the flags are consistent
3296                 */
3297                 for (i=0; i<nodemap->num; i++) {
3298                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3299                                 continue;
3300                         }
3301                         
3302                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3303                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3304                                   nodemap->nodes[j].pnn, 
3305                                   nodemap->nodes[i].pnn, 
3306                                   remote_nodemaps[j]->nodes[i].flags,
3307                                   nodemap->nodes[j].flags));
3308                                 if (i == j) {
3309                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3310                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3311                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3312                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3313                                                     vnnmap);
3314                                         return;
3315                                 } else {
3316                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3317                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3318                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3319                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3320                                                     vnnmap);
3321                                         return;
3322                                 }
3323                         }
3324                 }
3325         }
3326
3327
3328         /* there better be the same number of lmasters in the vnn map
3329            as there are active nodes or we will have to do a recovery
3330          */
3331         if (vnnmap->size != rec->num_active) {
3332                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3333                           vnnmap->size, rec->num_active));
3334                 ctdb_set_culprit(rec, ctdb->pnn);
3335                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3336                 return;
3337         }
3338
3339         /* verify that all active nodes in the nodemap also exist in 
3340            the vnnmap.
3341          */
3342         for (j=0; j<nodemap->num; j++) {
3343                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3344                         continue;
3345                 }
3346                 if (nodemap->nodes[j].pnn == pnn) {
3347                         continue;
3348                 }
3349
3350                 for (i=0; i<vnnmap->size; i++) {
3351                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3352                                 break;
3353                         }
3354                 }
3355                 if (i == vnnmap->size) {
3356                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3357                                   nodemap->nodes[j].pnn));
3358                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3359                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3360                         return;
3361                 }
3362         }
3363
3364         
3365         /* verify that all other nodes have the same vnnmap
3366            and are from the same generation
3367          */
3368         for (j=0; j<nodemap->num; j++) {
3369                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3370                         continue;
3371                 }
3372                 if (nodemap->nodes[j].pnn == pnn) {
3373                         continue;
3374                 }
3375
3376                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3377                                           mem_ctx, &remote_vnnmap);
3378                 if (ret != 0) {
3379                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3380                                   nodemap->nodes[j].pnn));
3381                         return;
3382                 }
3383
3384                 /* verify the vnnmap generation is the same */
3385                 if (vnnmap->generation != remote_vnnmap->generation) {
3386                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3387                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3388                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3389                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3390                         return;
3391                 }
3392
3393                 /* verify the vnnmap size is the same */
3394                 if (vnnmap->size != remote_vnnmap->size) {
3395                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3396                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3397                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3398                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3399                         return;
3400                 }
3401
3402                 /* verify the vnnmap is the same */
3403                 for (i=0;i<vnnmap->size;i++) {
3404                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3405                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3406                                           nodemap->nodes[j].pnn));
3407                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3408                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3409                                             vnnmap);
3410                                 return;
3411                         }
3412                 }
3413         }
3414
3415         /* we might need to change who has what IP assigned */
3416         if (rec->need_takeover_run) {
3417                 uint32_t culprit = (uint32_t)-1;
3418
3419                 rec->need_takeover_run = false;
3420
3421                 /* update the list of public ips that a node can handle for
3422                    all connected nodes
3423                 */
3424                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3425                 if (ret != 0) {
3426                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3427                                          culprit));
3428                         rec->need_takeover_run = true;
3429                         return;
3430                 }
3431
3432                 /* execute the "startrecovery" event script on all nodes */
3433                 ret = run_startrecovery_eventscript(rec, nodemap);
3434                 if (ret!=0) {
3435                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3436                         ctdb_set_culprit(rec, ctdb->pnn);
3437                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3438                         return;
3439                 }
3440
3441                 ret = ctdb_takeover_run(ctdb, nodemap);
3442                 if (ret != 0) {
3443                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3444                         return;
3445                 }
3446
3447                 /* execute the "recovered" event script on all nodes */
3448                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3449 #if 0
3450 // we cant check whether the event completed successfully
3451 // since this script WILL fail if the node is in recovery mode
3452 // and if that race happens, the code here would just cause a second
3453 // cascading recovery.
3454                 if (ret!=0) {
3455                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3456                         ctdb_set_culprit(rec, ctdb->pnn);
3457                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3458                 }
3459 #endif
3460         }
3461 }
3462
3463 /*
3464   the main monitoring loop
3465  */
3466 static void monitor_cluster(struct ctdb_context *ctdb)
3467 {
3468         struct ctdb_recoverd *rec;
3469
3470         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3471
3472         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3473         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3474
3475         rec->ctdb = ctdb;
3476
3477         rec->priority_time = timeval_current();
3478
3479         /* register a message port for sending memory dumps */
3480         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3481
3482         /* register a message port for recovery elections */
3483         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3484
3485         /* when nodes are disabled/enabled */
3486         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3487
3488         /* when we are asked to puch out a flag change */
3489         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3490
3491         /* register a message port for vacuum fetch */
3492         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3493
3494         /* register a message port for reloadnodes  */
3495         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3496
3497         /* register a message port for performing a takeover run */
3498         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3499
3500         /* register a message port for disabling the ip check for a short while */
3501         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3502
3503         /* register a message port for updating the recovery daemons node assignment for an ip */
3504         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3505
3506         for (;;) {
3507                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3508                 struct timeval start;
3509                 double elapsed;
3510
3511                 if (!mem_ctx) {
3512                         DEBUG(DEBUG_CRIT,(__location__
3513                                           " Failed to create temp context\n"));
3514                         exit(-1);
3515                 }
3516
3517                 start = timeval_current();
3518                 main_loop(ctdb, rec, mem_ctx);
3519                 talloc_free(mem_ctx);
3520
3521                 /* we only check for recovery once every second */
3522                 elapsed = timeval_elapsed(&start);
3523                 if (elapsed < ctdb->tunable.recover_interval) {
3524                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3525                                           - elapsed);
3526                 }
3527         }
3528 }
3529
3530 /*
3531   event handler for when the main ctdbd dies
3532  */
3533 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3534                                  uint16_t flags, void *private_data)
3535 {
3536         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3537         _exit(1);
3538 }
3539
3540 /*
3541   called regularly to verify that the recovery daemon is still running
3542  */
3543 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3544                               struct timeval yt, void *p)
3545 {
3546         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3547
3548         if (kill(ctdb->recoverd_pid, 0) != 0) {
3549                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3550
3551                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3552                                 ctdb_restart_recd, ctdb);
3553
3554                 return;
3555         }
3556
3557         event_add_timed(ctdb->ev, ctdb, 
3558                         timeval_current_ofs(30, 0),
3559                         ctdb_check_recd, ctdb);
3560 }
3561
3562 static void recd_sig_child_handler(struct event_context *ev,
3563         struct signal_event *se, int signum, int count,
3564         void *dont_care, 
3565         void *private_data)
3566 {
3567 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3568         int status;
3569         pid_t pid = -1;
3570
3571         while (pid != 0) {
3572                 pid = waitpid(-1, &status, WNOHANG);
3573                 if (pid == -1) {
3574                         if (errno != ECHILD) {
3575                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3576                         }
3577                         return;
3578                 }
3579                 if (pid > 0) {
3580                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3581                 }
3582         }
3583 }
3584
3585 /*
3586   startup the recovery daemon as a child of the main ctdb daemon
3587  */
3588 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3589 {
3590         int fd[2];
3591         struct signal_event *se;
3592         struct tevent_fd *fde;
3593
3594         if (pipe(fd) != 0) {
3595                 return -1;
3596         }
3597
3598         ctdb->ctdbd_pid = getpid();
3599
3600         ctdb->recoverd_pid = fork();
3601         if (ctdb->recoverd_pid == -1) {
3602                 return -1;
3603         }
3604         
3605         if (ctdb->recoverd_pid != 0) {
3606                 close(fd[0]);
3607                 event_add_timed(ctdb->ev, ctdb, 
3608                                 timeval_current_ofs(30, 0),
3609                                 ctdb_check_recd, ctdb);
3610                 return 0;
3611         }
3612
3613         close(fd[1]);
3614
3615         srandom(getpid() ^ time(NULL));
3616
3617         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3618                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3619                 exit(1);
3620         }
3621
3622         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3623
3624         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3625                      ctdb_recoverd_parent, &fd[0]);     
3626         tevent_fd_set_auto_close(fde);
3627
3628         /* set up a handler to pick up sigchld */
3629         se = event_add_signal(ctdb->ev, ctdb,
3630                                      SIGCHLD, 0,
3631                                      recd_sig_child_handler,
3632                                      ctdb);
3633         if (se == NULL) {
3634                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3635                 exit(1);
3636         }
3637
3638         monitor_cluster(ctdb);
3639
3640         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3641         return -1;
3642 }
3643
3644 /*
3645   shutdown the recovery daemon
3646  */
3647 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3648 {
3649         if (ctdb->recoverd_pid == 0) {
3650                 return;
3651         }
3652
3653         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3654         kill(ctdb->recoverd_pid, SIGTERM);
3655 }
3656
3657 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3658                        struct timeval t, void *private_data)
3659 {
3660         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3661
3662         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3663         ctdb_stop_recoverd(ctdb);
3664         ctdb_start_recoverd(ctdb);
3665 }