dont check the public ip assignment or if even we are hosting them and shouldnt
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (rec->ip_check_disable_ctx == NULL) {
1281                         if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1282                                 DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1283                                 rec->need_takeover_run = true;
1284                         }
1285                 }
1286
1287                 /* grab a new shiny list of public ips from the node */
1288                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1289                                         CONTROL_TIMEOUT(),
1290                                         ctdb->nodes[j]->pnn,
1291                                         ctdb->nodes,
1292                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1293                                         &ctdb->nodes[j]->available_public_ips);
1294                 if (ret != 0) {
1295                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1296                                 ctdb->nodes[j]->pnn));
1297                         if (culprit) {
1298                                 *culprit = ctdb->nodes[j]->pnn;
1299                         }
1300                         return -1;
1301                 }
1302         }
1303
1304         return 0;
1305 }
1306
1307 /* when we start a recovery, make sure all nodes use the same reclock file
1308    setting
1309 */
1310 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1311 {
1312         struct ctdb_context *ctdb = rec->ctdb;
1313         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1314         TDB_DATA data;
1315         uint32_t *nodes;
1316
1317         if (ctdb->recovery_lock_file == NULL) {
1318                 data.dptr  = NULL;
1319                 data.dsize = 0;
1320         } else {
1321                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1322                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1323         }
1324
1325         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1326         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1327                                         nodes, 0,
1328                                         CONTROL_TIMEOUT(),
1329                                         false, data,
1330                                         NULL, NULL,
1331                                         rec) != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1333                 talloc_free(tmp_ctx);
1334                 return -1;
1335         }
1336
1337         talloc_free(tmp_ctx);
1338         return 0;
1339 }
1340
1341
1342 /*
1343   we are the recmaster, and recovery is needed - start a recovery run
1344  */
1345 static int do_recovery(struct ctdb_recoverd *rec, 
1346                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1347                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1348 {
1349         struct ctdb_context *ctdb = rec->ctdb;
1350         int i, j, ret;
1351         uint32_t generation;
1352         struct ctdb_dbid_map *dbmap;
1353         TDB_DATA data;
1354         uint32_t *nodes;
1355         struct timeval start_time;
1356         uint32_t culprit = (uint32_t)-1;
1357
1358         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1359
1360         /* if recovery fails, force it again */
1361         rec->need_recovery = true;
1362
1363         for (i=0; i<ctdb->num_nodes; i++) {
1364                 struct ctdb_banning_state *ban_state;
1365
1366                 if (ctdb->nodes[i]->ban_state == NULL) {
1367                         continue;
1368                 }
1369                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1370                 if (ban_state->count < 2*ctdb->num_nodes) {
1371                         continue;
1372                 }
1373                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1374                         ctdb->nodes[i]->pnn, ban_state->count,
1375                         ctdb->tunable.recovery_ban_period));
1376                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1377                 ban_state->count = 0;
1378         }
1379
1380
1381         if (ctdb->tunable.verify_recovery_lock != 0) {
1382                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1383                 start_time = timeval_current();
1384                 if (!ctdb_recovery_lock(ctdb, true)) {
1385                         ctdb_set_culprit(rec, pnn);
1386                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1387                         return -1;
1388                 }
1389                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1390                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1391         }
1392
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1394
1395         /* get a list of all databases */
1396         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1397         if (ret != 0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1399                 return -1;
1400         }
1401
1402         /* we do the db creation before we set the recovery mode, so the freeze happens
1403            on all databases we will be dealing with. */
1404
1405         /* verify that we have all the databases any other node has */
1406         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1407         if (ret != 0) {
1408                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1409                 return -1;
1410         }
1411
1412         /* verify that all other nodes have all our databases */
1413         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1416                 return -1;
1417         }
1418         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1419
1420         /* update the database priority for all remote databases */
1421         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1422         if (ret != 0) {
1423                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1424         }
1425         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1426
1427
1428         /* update all other nodes to use the same setting for reclock files
1429            as the local recovery master.
1430         */
1431         sync_recovery_lock_file_across_cluster(rec);
1432
1433         /* set recovery mode to active on all nodes */
1434         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1437                 return -1;
1438         }
1439
1440         /* execute the "startrecovery" event script on all nodes */
1441         ret = run_startrecovery_eventscript(rec, nodemap);
1442         if (ret!=0) {
1443                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1444                 return -1;
1445         }
1446
1447         /*
1448           update all nodes to have the same flags that we have
1449          */
1450         for (i=0;i<nodemap->num;i++) {
1451                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1452                         continue;
1453                 }
1454
1455                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1456                 if (ret != 0) {
1457                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1458                         return -1;
1459                 }
1460         }
1461
1462         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1463
1464         /* pick a new generation number */
1465         generation = new_generation();
1466
1467         /* change the vnnmap on this node to use the new generation 
1468            number but not on any other nodes.
1469            this guarantees that if we abort the recovery prematurely
1470            for some reason (a node stops responding?)
1471            that we can just return immediately and we will reenter
1472            recovery shortly again.
1473            I.e. we deliberately leave the cluster with an inconsistent
1474            generation id to allow us to abort recovery at any stage and
1475            just restart it from scratch.
1476          */
1477         vnnmap->generation = generation;
1478         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1479         if (ret != 0) {
1480                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1481                 return -1;
1482         }
1483
1484         data.dptr = (void *)&generation;
1485         data.dsize = sizeof(uint32_t);
1486
1487         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1488         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1489                                         nodes, 0,
1490                                         CONTROL_TIMEOUT(), false, data,
1491                                         NULL,
1492                                         transaction_start_fail_callback,
1493                                         rec) != 0) {
1494                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1495                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1496                                         nodes, 0,
1497                                         CONTROL_TIMEOUT(), false, tdb_null,
1498                                         NULL,
1499                                         NULL,
1500                                         NULL) != 0) {
1501                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1502                 }
1503                 return -1;
1504         }
1505
1506         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1507
1508         for (i=0;i<dbmap->num;i++) {
1509                 ret = recover_database(rec, mem_ctx,
1510                                        dbmap->dbs[i].dbid,
1511                                        dbmap->dbs[i].persistent,
1512                                        pnn, nodemap, generation);
1513                 if (ret != 0) {
1514                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1515                         return -1;
1516                 }
1517         }
1518
1519         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1520
1521         /* commit all the changes */
1522         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1523                                         nodes, 0,
1524                                         CONTROL_TIMEOUT(), false, data,
1525                                         NULL, NULL,
1526                                         NULL) != 0) {
1527                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1528                 return -1;
1529         }
1530
1531         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1532         
1533
1534         /* update the capabilities for all nodes */
1535         ret = update_capabilities(ctdb, nodemap);
1536         if (ret!=0) {
1537                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1538                 return -1;
1539         }
1540
1541         /* build a new vnn map with all the currently active and
1542            unbanned nodes */
1543         generation = new_generation();
1544         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1545         CTDB_NO_MEMORY(ctdb, vnnmap);
1546         vnnmap->generation = generation;
1547         vnnmap->size = 0;
1548         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1549         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1550         for (i=j=0;i<nodemap->num;i++) {
1551                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1552                         continue;
1553                 }
1554                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1555                         /* this node can not be an lmaster */
1556                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1557                         continue;
1558                 }
1559
1560                 vnnmap->size++;
1561                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1562                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1563                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1564
1565         }
1566         if (vnnmap->size == 0) {
1567                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1568                 vnnmap->size++;
1569                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1570                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1571                 vnnmap->map[0] = pnn;
1572         }       
1573
1574         /* update to the new vnnmap on all nodes */
1575         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1576         if (ret != 0) {
1577                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1578                 return -1;
1579         }
1580
1581         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1582
1583         /* update recmaster to point to us for all nodes */
1584         ret = set_recovery_master(ctdb, nodemap, pnn);
1585         if (ret!=0) {
1586                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1587                 return -1;
1588         }
1589
1590         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1591
1592         /*
1593           update all nodes to have the same flags that we have
1594          */
1595         for (i=0;i<nodemap->num;i++) {
1596                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1597                         continue;
1598                 }
1599
1600                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1601                 if (ret != 0) {
1602                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1603                         return -1;
1604                 }
1605         }
1606
1607         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1608
1609         /* disable recovery mode */
1610         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1611         if (ret != 0) {
1612                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1613                 return -1;
1614         }
1615
1616         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1617
1618         /*
1619           tell nodes to takeover their public IPs
1620          */
1621         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1622         if (ret != 0) {
1623                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1624                                  culprit));
1625                 return -1;
1626         }
1627         rec->need_takeover_run = false;
1628         ret = ctdb_takeover_run(ctdb, nodemap);
1629         if (ret != 0) {
1630                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1631                 return -1;
1632         }
1633         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1634
1635         /* execute the "recovered" event script on all nodes */
1636         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1637         if (ret!=0) {
1638                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1639                 return -1;
1640         }
1641
1642         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1643
1644         /* send a message to all clients telling them that the cluster 
1645            has been reconfigured */
1646         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1647
1648         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1649
1650         rec->need_recovery = false;
1651
1652         /* we managed to complete a full recovery, make sure to forgive
1653            any past sins by the nodes that could now participate in the
1654            recovery.
1655         */
1656         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1657         for (i=0;i<nodemap->num;i++) {
1658                 struct ctdb_banning_state *ban_state;
1659
1660                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1661                         continue;
1662                 }
1663
1664                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1665                 if (ban_state == NULL) {
1666                         continue;
1667                 }
1668
1669                 ban_state->count = 0;
1670         }
1671
1672
1673         /* We just finished a recovery successfully. 
1674            We now wait for rerecovery_timeout before we allow 
1675            another recovery to take place.
1676         */
1677         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1678         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1679         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1680
1681         return 0;
1682 }
1683
1684
1685 /*
1686   elections are won by first checking the number of connected nodes, then
1687   the priority time, then the pnn
1688  */
1689 struct election_message {
1690         uint32_t num_connected;
1691         struct timeval priority_time;
1692         uint32_t pnn;
1693         uint32_t node_flags;
1694 };
1695
1696 /*
1697   form this nodes election data
1698  */
1699 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1700 {
1701         int ret, i;
1702         struct ctdb_node_map *nodemap;
1703         struct ctdb_context *ctdb = rec->ctdb;
1704
1705         ZERO_STRUCTP(em);
1706
1707         em->pnn = rec->ctdb->pnn;
1708         em->priority_time = rec->priority_time;
1709
1710         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1711         if (ret != 0) {
1712                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1713                 return;
1714         }
1715
1716         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1717         em->node_flags = rec->node_flags;
1718
1719         for (i=0;i<nodemap->num;i++) {
1720                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1721                         em->num_connected++;
1722                 }
1723         }
1724
1725         /* we shouldnt try to win this election if we cant be a recmaster */
1726         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1727                 em->num_connected = 0;
1728                 em->priority_time = timeval_current();
1729         }
1730
1731         talloc_free(nodemap);
1732 }
1733
1734 /*
1735   see if the given election data wins
1736  */
1737 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1738 {
1739         struct election_message myem;
1740         int cmp = 0;
1741
1742         ctdb_election_data(rec, &myem);
1743
1744         /* we cant win if we dont have the recmaster capability */
1745         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1746                 return false;
1747         }
1748
1749         /* we cant win if we are banned */
1750         if (rec->node_flags & NODE_FLAGS_BANNED) {
1751                 return false;
1752         }       
1753
1754         /* we cant win if we are stopped */
1755         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1756                 return false;
1757         }       
1758
1759         /* we will automatically win if the other node is banned */
1760         if (em->node_flags & NODE_FLAGS_BANNED) {
1761                 return true;
1762         }
1763
1764         /* we will automatically win if the other node is banned */
1765         if (em->node_flags & NODE_FLAGS_STOPPED) {
1766                 return true;
1767         }
1768
1769         /* try to use the most connected node */
1770         if (cmp == 0) {
1771                 cmp = (int)myem.num_connected - (int)em->num_connected;
1772         }
1773
1774         /* then the longest running node */
1775         if (cmp == 0) {
1776                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1777         }
1778
1779         if (cmp == 0) {
1780                 cmp = (int)myem.pnn - (int)em->pnn;
1781         }
1782
1783         return cmp > 0;
1784 }
1785
1786 /*
1787   send out an election request
1788  */
1789 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1790 {
1791         int ret;
1792         TDB_DATA election_data;
1793         struct election_message emsg;
1794         uint64_t srvid;
1795         struct ctdb_context *ctdb = rec->ctdb;
1796
1797         srvid = CTDB_SRVID_RECOVERY;
1798
1799         ctdb_election_data(rec, &emsg);
1800
1801         election_data.dsize = sizeof(struct election_message);
1802         election_data.dptr  = (unsigned char *)&emsg;
1803
1804
1805         /* send an election message to all active nodes */
1806         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1807         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1808
1809
1810         /* A new node that is already frozen has entered the cluster.
1811            The existing nodes are not frozen and dont need to be frozen
1812            until the election has ended and we start the actual recovery
1813         */
1814         if (update_recmaster == true) {
1815                 /* first we assume we will win the election and set 
1816                    recoverymaster to be ourself on the current node
1817                  */
1818                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1819                 if (ret != 0) {
1820                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1821                         return -1;
1822                 }
1823         }
1824
1825
1826         return 0;
1827 }
1828
1829 /*
1830   this function will unban all nodes in the cluster
1831 */
1832 static void unban_all_nodes(struct ctdb_context *ctdb)
1833 {
1834         int ret, i;
1835         struct ctdb_node_map *nodemap;
1836         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1837         
1838         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1841                 return;
1842         }
1843
1844         for (i=0;i<nodemap->num;i++) {
1845                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1846                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1847                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1848                 }
1849         }
1850
1851         talloc_free(tmp_ctx);
1852 }
1853
1854
1855 /*
1856   we think we are winning the election - send a broadcast election request
1857  */
1858 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1859 {
1860         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1861         int ret;
1862
1863         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1866         }
1867
1868         talloc_free(rec->send_election_te);
1869         rec->send_election_te = NULL;
1870 }
1871
1872 /*
1873   handler for memory dumps
1874 */
1875 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1876                              TDB_DATA data, void *private_data)
1877 {
1878         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1879         TDB_DATA *dump;
1880         int ret;
1881         struct rd_memdump_reply *rd;
1882
1883         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1884                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1885                 talloc_free(tmp_ctx);
1886                 return;
1887         }
1888         rd = (struct rd_memdump_reply *)data.dptr;
1889
1890         dump = talloc_zero(tmp_ctx, TDB_DATA);
1891         if (dump == NULL) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1893                 talloc_free(tmp_ctx);
1894                 return;
1895         }
1896         ret = ctdb_dump_memory(ctdb, dump);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1899                 talloc_free(tmp_ctx);
1900                 return;
1901         }
1902
1903 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1904
1905         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1906         if (ret != 0) {
1907                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1908                 talloc_free(tmp_ctx);
1909                 return;
1910         }
1911
1912         talloc_free(tmp_ctx);
1913 }
1914
1915 /*
1916   handler for reload_nodes
1917 */
1918 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1919                              TDB_DATA data, void *private_data)
1920 {
1921         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1922
1923         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1924
1925         reload_nodes_file(rec->ctdb);
1926 }
1927
1928
1929 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1930                               struct timeval yt, void *p)
1931 {
1932         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1933
1934         talloc_free(rec->ip_check_disable_ctx);
1935         rec->ip_check_disable_ctx = NULL;
1936 }
1937
1938
1939 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1940                              TDB_DATA data, void *private_data)
1941 {
1942         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1943         struct ctdb_public_ip *ip;
1944
1945         if (rec->recmaster != rec->ctdb->pnn) {
1946                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1947                 return;
1948         }
1949
1950         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1951                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1952                 return;
1953         }
1954
1955         ip = (struct ctdb_public_ip *)data.dptr;
1956
1957         update_ip_assignment_tree(rec->ctdb, ip);
1958 }
1959
1960
1961 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1962                              TDB_DATA data, void *private_data)
1963 {
1964         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1965         uint32_t timeout;
1966
1967         if (rec->ip_check_disable_ctx != NULL) {
1968                 talloc_free(rec->ip_check_disable_ctx);
1969                 rec->ip_check_disable_ctx = NULL;
1970         }
1971
1972         if (data.dsize != sizeof(uint32_t)) {
1973                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1974                                  "expexting %lu\n", (long unsigned)data.dsize,
1975                                  (long unsigned)sizeof(uint32_t)));
1976                 return;
1977         }
1978         if (data.dptr == NULL) {
1979                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1980                 return;
1981         }
1982
1983         timeout = *((uint32_t *)data.dptr);
1984         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1985
1986         rec->ip_check_disable_ctx = talloc_new(rec);
1987         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1988
1989         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1990 }
1991
1992
1993 /*
1994   handler for ip reallocate, just add it to the list of callers and 
1995   handle this later in the monitor_cluster loop so we do not recurse
1996   with other callers to takeover_run()
1997 */
1998 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1999                              TDB_DATA data, void *private_data)
2000 {
2001         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2002         struct ip_reallocate_list *caller;
2003
2004         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2005                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2006                 return;
2007         }
2008
2009         if (rec->ip_reallocate_ctx == NULL) {
2010                 rec->ip_reallocate_ctx = talloc_new(rec);
2011                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2012         }
2013
2014         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2015         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2016
2017         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2018         caller->next = rec->reallocate_callers;
2019         rec->reallocate_callers = caller;
2020
2021         return;
2022 }
2023
2024 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2025 {
2026         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2027         TDB_DATA result;
2028         int32_t ret;
2029         struct ip_reallocate_list *callers;
2030         uint32_t culprit;
2031
2032         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2033
2034         /* update the list of public ips that a node can handle for
2035            all connected nodes
2036         */
2037         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2038         if (ret != 0) {
2039                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2040                                  culprit));
2041                 rec->need_takeover_run = true;
2042         }
2043         if (ret == 0) {
2044                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2045                 if (ret != 0) {
2046                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2047                                          culprit));
2048                         rec->need_takeover_run = true;
2049                 }
2050         }
2051
2052         result.dsize = sizeof(int32_t);
2053         result.dptr  = (uint8_t *)&ret;
2054
2055         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2056
2057                 /* Someone that sent srvid==0 does not want a reply */
2058                 if (callers->rd->srvid == 0) {
2059                         continue;
2060                 }
2061                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2062                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2063                                   (unsigned long long)callers->rd->srvid));
2064                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2065                 if (ret != 0) {
2066                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2067                                          "message to %u:%llu\n",
2068                                          (unsigned)callers->rd->pnn,
2069                                          (unsigned long long)callers->rd->srvid));
2070                 }
2071         }
2072
2073         talloc_free(tmp_ctx);
2074         talloc_free(rec->ip_reallocate_ctx);
2075         rec->ip_reallocate_ctx = NULL;
2076         rec->reallocate_callers = NULL;
2077         
2078 }
2079
2080
2081 /*
2082   handler for recovery master elections
2083 */
2084 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2085                              TDB_DATA data, void *private_data)
2086 {
2087         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2088         int ret;
2089         struct election_message *em = (struct election_message *)data.dptr;
2090         TALLOC_CTX *mem_ctx;
2091
2092         /* we got an election packet - update the timeout for the election */
2093         talloc_free(rec->election_timeout);
2094         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2095                                                 fast_start ?
2096                                                 timeval_current_ofs(0, 500000) :
2097                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2098                                                 ctdb_election_timeout, rec);
2099
2100         mem_ctx = talloc_new(ctdb);
2101
2102         /* someone called an election. check their election data
2103            and if we disagree and we would rather be the elected node, 
2104            send a new election message to all other nodes
2105          */
2106         if (ctdb_election_win(rec, em)) {
2107                 if (!rec->send_election_te) {
2108                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2109                                                                 timeval_current_ofs(0, 500000),
2110                                                                 election_send_request, rec);
2111                 }
2112                 talloc_free(mem_ctx);
2113                 /*unban_all_nodes(ctdb);*/
2114                 return;
2115         }
2116         
2117         /* we didn't win */
2118         talloc_free(rec->send_election_te);
2119         rec->send_election_te = NULL;
2120
2121         if (ctdb->tunable.verify_recovery_lock != 0) {
2122                 /* release the recmaster lock */
2123                 if (em->pnn != ctdb->pnn &&
2124                     ctdb->recovery_lock_fd != -1) {
2125                         close(ctdb->recovery_lock_fd);
2126                         ctdb->recovery_lock_fd = -1;
2127                         unban_all_nodes(ctdb);
2128                 }
2129         }
2130
2131         /* ok, let that guy become recmaster then */
2132         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2135                 talloc_free(mem_ctx);
2136                 return;
2137         }
2138
2139         talloc_free(mem_ctx);
2140         return;
2141 }
2142
2143
2144 /*
2145   force the start of the election process
2146  */
2147 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2148                            struct ctdb_node_map *nodemap)
2149 {
2150         int ret;
2151         struct ctdb_context *ctdb = rec->ctdb;
2152
2153         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2154
2155         /* set all nodes to recovery mode to stop all internode traffic */
2156         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2159                 return;
2160         }
2161
2162         talloc_free(rec->election_timeout);
2163         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2164                                                 fast_start ?
2165                                                 timeval_current_ofs(0, 500000) :
2166                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2167                                                 ctdb_election_timeout, rec);
2168
2169         ret = send_election_request(rec, pnn, true);
2170         if (ret!=0) {
2171                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2172                 return;
2173         }
2174
2175         /* wait for a few seconds to collect all responses */
2176         ctdb_wait_election(rec);
2177 }
2178
2179
2180
2181 /*
2182   handler for when a node changes its flags
2183 */
2184 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2185                             TDB_DATA data, void *private_data)
2186 {
2187         int ret;
2188         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2189         struct ctdb_node_map *nodemap=NULL;
2190         TALLOC_CTX *tmp_ctx;
2191         uint32_t changed_flags;
2192         int i;
2193         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2194         int disabled_flag_changed;
2195
2196         if (data.dsize != sizeof(*c)) {
2197                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2198                 return;
2199         }
2200
2201         tmp_ctx = talloc_new(ctdb);
2202         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2203
2204         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2205         if (ret != 0) {
2206                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2207                 talloc_free(tmp_ctx);
2208                 return;         
2209         }
2210
2211
2212         for (i=0;i<nodemap->num;i++) {
2213                 if (nodemap->nodes[i].pnn == c->pnn) break;
2214         }
2215
2216         if (i == nodemap->num) {
2217                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2218                 talloc_free(tmp_ctx);
2219                 return;
2220         }
2221
2222         changed_flags = c->old_flags ^ c->new_flags;
2223
2224         if (nodemap->nodes[i].flags != c->new_flags) {
2225                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2226         }
2227
2228         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2229
2230         nodemap->nodes[i].flags = c->new_flags;
2231
2232         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2233                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2234
2235         if (ret == 0) {
2236                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2237                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2238         }
2239         
2240         if (ret == 0 &&
2241             ctdb->recovery_master == ctdb->pnn &&
2242             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2243                 /* Only do the takeover run if the perm disabled or unhealthy
2244                    flags changed since these will cause an ip failover but not
2245                    a recovery.
2246                    If the node became disconnected or banned this will also
2247                    lead to an ip address failover but that is handled 
2248                    during recovery
2249                 */
2250                 if (disabled_flag_changed) {
2251                         rec->need_takeover_run = true;
2252                 }
2253         }
2254
2255         talloc_free(tmp_ctx);
2256 }
2257
2258 /*
2259   handler for when we need to push out flag changes ot all other nodes
2260 */
2261 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2262                             TDB_DATA data, void *private_data)
2263 {
2264         int ret;
2265         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2266         struct ctdb_node_map *nodemap=NULL;
2267         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2268         uint32_t recmaster;
2269         uint32_t *nodes;
2270
2271         /* find the recovery master */
2272         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2273         if (ret != 0) {
2274                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2275                 talloc_free(tmp_ctx);
2276                 return;
2277         }
2278
2279         /* read the node flags from the recmaster */
2280         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2281         if (ret != 0) {
2282                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2283                 talloc_free(tmp_ctx);
2284                 return;
2285         }
2286         if (c->pnn >= nodemap->num) {
2287                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2288                 talloc_free(tmp_ctx);
2289                 return;
2290         }
2291
2292         /* send the flags update to all connected nodes */
2293         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2294
2295         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2296                                       nodes, 0, CONTROL_TIMEOUT(),
2297                                       false, data,
2298                                       NULL, NULL,
2299                                       NULL) != 0) {
2300                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2301
2302                 talloc_free(tmp_ctx);
2303                 return;
2304         }
2305
2306         talloc_free(tmp_ctx);
2307 }
2308
2309
2310 struct verify_recmode_normal_data {
2311         uint32_t count;
2312         enum monitor_result status;
2313 };
2314
2315 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2316 {
2317         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2318
2319
2320         /* one more node has responded with recmode data*/
2321         rmdata->count--;
2322
2323         /* if we failed to get the recmode, then return an error and let
2324            the main loop try again.
2325         */
2326         if (state->state != CTDB_CONTROL_DONE) {
2327                 if (rmdata->status == MONITOR_OK) {
2328                         rmdata->status = MONITOR_FAILED;
2329                 }
2330                 return;
2331         }
2332
2333         /* if we got a response, then the recmode will be stored in the
2334            status field
2335         */
2336         if (state->status != CTDB_RECOVERY_NORMAL) {
2337                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2338                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2339         }
2340
2341         return;
2342 }
2343
2344
2345 /* verify that all nodes are in normal recovery mode */
2346 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2347 {
2348         struct verify_recmode_normal_data *rmdata;
2349         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2350         struct ctdb_client_control_state *state;
2351         enum monitor_result status;
2352         int j;
2353         
2354         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2355         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2356         rmdata->count  = 0;
2357         rmdata->status = MONITOR_OK;
2358
2359         /* loop over all active nodes and send an async getrecmode call to 
2360            them*/
2361         for (j=0; j<nodemap->num; j++) {
2362                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2363                         continue;
2364                 }
2365                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2366                                         CONTROL_TIMEOUT(), 
2367                                         nodemap->nodes[j].pnn);
2368                 if (state == NULL) {
2369                         /* we failed to send the control, treat this as 
2370                            an error and try again next iteration
2371                         */                      
2372                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2373                         talloc_free(mem_ctx);
2374                         return MONITOR_FAILED;
2375                 }
2376
2377                 /* set up the callback functions */
2378                 state->async.fn = verify_recmode_normal_callback;
2379                 state->async.private_data = rmdata;
2380
2381                 /* one more control to wait for to complete */
2382                 rmdata->count++;
2383         }
2384
2385
2386         /* now wait for up to the maximum number of seconds allowed
2387            or until all nodes we expect a response from has replied
2388         */
2389         while (rmdata->count > 0) {
2390                 event_loop_once(ctdb->ev);
2391         }
2392
2393         status = rmdata->status;
2394         talloc_free(mem_ctx);
2395         return status;
2396 }
2397
2398
2399 struct verify_recmaster_data {
2400         struct ctdb_recoverd *rec;
2401         uint32_t count;
2402         uint32_t pnn;
2403         enum monitor_result status;
2404 };
2405
2406 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2407 {
2408         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2409
2410
2411         /* one more node has responded with recmaster data*/
2412         rmdata->count--;
2413
2414         /* if we failed to get the recmaster, then return an error and let
2415            the main loop try again.
2416         */
2417         if (state->state != CTDB_CONTROL_DONE) {
2418                 if (rmdata->status == MONITOR_OK) {
2419                         rmdata->status = MONITOR_FAILED;
2420                 }
2421                 return;
2422         }
2423
2424         /* if we got a response, then the recmaster will be stored in the
2425            status field
2426         */
2427         if (state->status != rmdata->pnn) {
2428                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2429                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2430                 rmdata->status = MONITOR_ELECTION_NEEDED;
2431         }
2432
2433         return;
2434 }
2435
2436
2437 /* verify that all nodes agree that we are the recmaster */
2438 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2439 {
2440         struct ctdb_context *ctdb = rec->ctdb;
2441         struct verify_recmaster_data *rmdata;
2442         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2443         struct ctdb_client_control_state *state;
2444         enum monitor_result status;
2445         int j;
2446         
2447         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2448         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2449         rmdata->rec    = rec;
2450         rmdata->count  = 0;
2451         rmdata->pnn    = pnn;
2452         rmdata->status = MONITOR_OK;
2453
2454         /* loop over all active nodes and send an async getrecmaster call to 
2455            them*/
2456         for (j=0; j<nodemap->num; j++) {
2457                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2458                         continue;
2459                 }
2460                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2461                                         CONTROL_TIMEOUT(),
2462                                         nodemap->nodes[j].pnn);
2463                 if (state == NULL) {
2464                         /* we failed to send the control, treat this as 
2465                            an error and try again next iteration
2466                         */                      
2467                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2468                         talloc_free(mem_ctx);
2469                         return MONITOR_FAILED;
2470                 }
2471
2472                 /* set up the callback functions */
2473                 state->async.fn = verify_recmaster_callback;
2474                 state->async.private_data = rmdata;
2475
2476                 /* one more control to wait for to complete */
2477                 rmdata->count++;
2478         }
2479
2480
2481         /* now wait for up to the maximum number of seconds allowed
2482            or until all nodes we expect a response from has replied
2483         */
2484         while (rmdata->count > 0) {
2485                 event_loop_once(ctdb->ev);
2486         }
2487
2488         status = rmdata->status;
2489         talloc_free(mem_ctx);
2490         return status;
2491 }
2492
2493
2494 /* called to check that the local allocation of public ip addresses is ok.
2495 */
2496 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2497 {
2498         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2499         struct ctdb_control_get_ifaces *ifaces = NULL;
2500         struct ctdb_all_public_ips *ips = NULL;
2501         struct ctdb_uptime *uptime1 = NULL;
2502         struct ctdb_uptime *uptime2 = NULL;
2503         int ret, j;
2504         bool need_iface_check = false;
2505         bool need_takeover_run = false;
2506
2507         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2508                                 CTDB_CURRENT_NODE, &uptime1);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2511                 talloc_free(mem_ctx);
2512                 return -1;
2513         }
2514
2515
2516         /* read the interfaces from the local node */
2517         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2518         if (ret != 0) {
2519                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2520                 talloc_free(mem_ctx);
2521                 return -1;
2522         }
2523
2524         if (!rec->ifaces) {
2525                 need_iface_check = true;
2526         } else if (rec->ifaces->num != ifaces->num) {
2527                 need_iface_check = true;
2528         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2529                 need_iface_check = true;
2530         }
2531
2532         if (need_iface_check) {
2533                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2534                                      "local node %u - force takeover run\n",
2535                                      pnn));
2536                 need_takeover_run = true;
2537         }
2538
2539         /* read the ip allocation from the local node */
2540         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2541         if (ret != 0) {
2542                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2543                 talloc_free(mem_ctx);
2544                 return -1;
2545         }
2546
2547         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2548                                 CTDB_CURRENT_NODE, &uptime2);
2549         if (ret != 0) {
2550                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2551                 talloc_free(mem_ctx);
2552                 return -1;
2553         }
2554
2555         /* skip the check if the startrecovery time has changed */
2556         if (timeval_compare(&uptime1->last_recovery_started,
2557                             &uptime2->last_recovery_started) != 0) {
2558                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2559                 talloc_free(mem_ctx);
2560                 return 0;
2561         }
2562
2563         /* skip the check if the endrecovery time has changed */
2564         if (timeval_compare(&uptime1->last_recovery_finished,
2565                             &uptime2->last_recovery_finished) != 0) {
2566                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2567                 talloc_free(mem_ctx);
2568                 return 0;
2569         }
2570
2571         /* skip the check if we have started but not finished recovery */
2572         if (timeval_compare(&uptime1->last_recovery_finished,
2573                             &uptime1->last_recovery_started) != 1) {
2574                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2575                 talloc_free(mem_ctx);
2576
2577                 return 0;
2578         }
2579
2580         talloc_free(rec->ifaces);
2581         rec->ifaces = talloc_steal(rec, ifaces);
2582
2583         /* verify that we have the ip addresses we should have
2584            and we dont have ones we shouldnt have.
2585            if we find an inconsistency we set recmode to
2586            active on the local node and wait for the recmaster
2587            to do a full blown recovery
2588         */
2589         if (ctdb->tunable.disable_ip_failover == 0) {
2590                 for (j=0; j<ips->num; j++) {
2591                         if (ips->ips[j].pnn == pnn) {
2592                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2593                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2594                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2595                                         need_takeover_run = true;
2596                                 }
2597                         } else {
2598                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2599                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2600                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2601                                         need_takeover_run = true;
2602                                 }
2603                         }
2604                 }
2605         }
2606
2607         if (need_takeover_run) {
2608                 struct takeover_run_reply rd;
2609                 TDB_DATA data;
2610
2611                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2612
2613                 rd.pnn = ctdb->pnn;
2614                 rd.srvid = 0;
2615                 data.dptr = (uint8_t *)&rd;
2616                 data.dsize = sizeof(rd);
2617
2618                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2619                 if (ret != 0) {
2620                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2621                 }
2622         }
2623         talloc_free(mem_ctx);
2624         return 0;
2625 }
2626
2627
2628 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2629 {
2630         struct ctdb_node_map **remote_nodemaps = callback_data;
2631
2632         if (node_pnn >= ctdb->num_nodes) {
2633                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2634                 return;
2635         }
2636
2637         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2638
2639 }
2640
2641 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2642         struct ctdb_node_map *nodemap,
2643         struct ctdb_node_map **remote_nodemaps)
2644 {
2645         uint32_t *nodes;
2646
2647         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2648         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2649                                         nodes, 0,
2650                                         CONTROL_TIMEOUT(), false, tdb_null,
2651                                         async_getnodemap_callback,
2652                                         NULL,
2653                                         remote_nodemaps) != 0) {
2654                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2655
2656                 return -1;
2657         }
2658
2659         return 0;
2660 }
2661
2662 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2663 struct ctdb_check_reclock_state {
2664         struct ctdb_context *ctdb;
2665         struct timeval start_time;
2666         int fd[2];
2667         pid_t child;
2668         struct timed_event *te;
2669         struct fd_event *fde;
2670         enum reclock_child_status status;
2671 };
2672
2673 /* when we free the reclock state we must kill any child process.
2674 */
2675 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2676 {
2677         struct ctdb_context *ctdb = state->ctdb;
2678
2679         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2680
2681         if (state->fd[0] != -1) {
2682                 close(state->fd[0]);
2683                 state->fd[0] = -1;
2684         }
2685         if (state->fd[1] != -1) {
2686                 close(state->fd[1]);
2687                 state->fd[1] = -1;
2688         }
2689         kill(state->child, SIGKILL);
2690         return 0;
2691 }
2692
2693 /*
2694   called if our check_reclock child times out. this would happen if
2695   i/o to the reclock file blocks.
2696  */
2697 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2698                                          struct timeval t, void *private_data)
2699 {
2700         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2701                                            struct ctdb_check_reclock_state);
2702
2703         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2704         state->status = RECLOCK_TIMEOUT;
2705 }
2706
2707 /* this is called when the child process has completed checking the reclock
2708    file and has written data back to us through the pipe.
2709 */
2710 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2711                              uint16_t flags, void *private_data)
2712 {
2713         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2714                                              struct ctdb_check_reclock_state);
2715         char c = 0;
2716         int ret;
2717
2718         /* we got a response from our child process so we can abort the
2719            timeout.
2720         */
2721         talloc_free(state->te);
2722         state->te = NULL;
2723
2724         ret = read(state->fd[0], &c, 1);
2725         if (ret != 1 || c != RECLOCK_OK) {
2726                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2727                 state->status = RECLOCK_FAILED;
2728
2729                 return;
2730         }
2731
2732         state->status = RECLOCK_OK;
2733         return;
2734 }
2735
2736 static int check_recovery_lock(struct ctdb_context *ctdb)
2737 {
2738         int ret;
2739         struct ctdb_check_reclock_state *state;
2740         pid_t parent = getpid();
2741
2742         if (ctdb->recovery_lock_fd == -1) {
2743                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2744                 return -1;
2745         }
2746
2747         state = talloc(ctdb, struct ctdb_check_reclock_state);
2748         CTDB_NO_MEMORY(ctdb, state);
2749
2750         state->ctdb = ctdb;
2751         state->start_time = timeval_current();
2752         state->status = RECLOCK_CHECKING;
2753         state->fd[0] = -1;
2754         state->fd[1] = -1;
2755
2756         ret = pipe(state->fd);
2757         if (ret != 0) {
2758                 talloc_free(state);
2759                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2760                 return -1;
2761         }
2762
2763         state->child = fork();
2764         if (state->child == (pid_t)-1) {
2765                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2766                 close(state->fd[0]);
2767                 state->fd[0] = -1;
2768                 close(state->fd[1]);
2769                 state->fd[1] = -1;
2770                 talloc_free(state);
2771                 return -1;
2772         }
2773
2774         if (state->child == 0) {
2775                 char cc = RECLOCK_OK;
2776                 close(state->fd[0]);
2777                 state->fd[0] = -1;
2778
2779                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2780                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2781                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2782                         cc = RECLOCK_FAILED;
2783                 }
2784
2785                 write(state->fd[1], &cc, 1);
2786                 /* make sure we die when our parent dies */
2787                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2788                         sleep(5);
2789                         write(state->fd[1], &cc, 1);
2790                 }
2791                 _exit(0);
2792         }
2793         close(state->fd[1]);
2794         state->fd[1] = -1;
2795         set_close_on_exec(state->fd[0]);
2796
2797         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2798
2799         talloc_set_destructor(state, check_reclock_destructor);
2800
2801         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2802                                     ctdb_check_reclock_timeout, state);
2803         if (state->te == NULL) {
2804                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2805                 talloc_free(state);
2806                 return -1;
2807         }
2808
2809         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2810                                 EVENT_FD_READ,
2811                                 reclock_child_handler,
2812                                 (void *)state);
2813
2814         if (state->fde == NULL) {
2815                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2816                 talloc_free(state);
2817                 return -1;
2818         }
2819         tevent_fd_set_auto_close(state->fde);
2820
2821         while (state->status == RECLOCK_CHECKING) {
2822                 event_loop_once(ctdb->ev);
2823         }
2824
2825         if (state->status == RECLOCK_FAILED) {
2826                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2827                 close(ctdb->recovery_lock_fd);
2828                 ctdb->recovery_lock_fd = -1;
2829                 talloc_free(state);
2830                 return -1;
2831         }
2832
2833         talloc_free(state);
2834         return 0;
2835 }
2836
2837 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2838 {
2839         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2840         const char *reclockfile;
2841
2842         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2843                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2844                 talloc_free(tmp_ctx);
2845                 return -1;      
2846         }
2847
2848         if (reclockfile == NULL) {
2849                 if (ctdb->recovery_lock_file != NULL) {
2850                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2851                         talloc_free(ctdb->recovery_lock_file);
2852                         ctdb->recovery_lock_file = NULL;
2853                         if (ctdb->recovery_lock_fd != -1) {
2854                                 close(ctdb->recovery_lock_fd);
2855                                 ctdb->recovery_lock_fd = -1;
2856                         }
2857                 }
2858                 ctdb->tunable.verify_recovery_lock = 0;
2859                 talloc_free(tmp_ctx);
2860                 return 0;
2861         }
2862
2863         if (ctdb->recovery_lock_file == NULL) {
2864                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2865                 if (ctdb->recovery_lock_fd != -1) {
2866                         close(ctdb->recovery_lock_fd);
2867                         ctdb->recovery_lock_fd = -1;
2868                 }
2869                 talloc_free(tmp_ctx);
2870                 return 0;
2871         }
2872
2873
2874         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2875                 talloc_free(tmp_ctx);
2876                 return 0;
2877         }
2878
2879         talloc_free(ctdb->recovery_lock_file);
2880         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2881         ctdb->tunable.verify_recovery_lock = 0;
2882         if (ctdb->recovery_lock_fd != -1) {
2883                 close(ctdb->recovery_lock_fd);
2884                 ctdb->recovery_lock_fd = -1;
2885         }
2886
2887         talloc_free(tmp_ctx);
2888         return 0;
2889 }
2890
2891 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2892                       TALLOC_CTX *mem_ctx)
2893 {
2894         uint32_t pnn;
2895         struct ctdb_node_map *nodemap=NULL;
2896         struct ctdb_node_map *recmaster_nodemap=NULL;
2897         struct ctdb_node_map **remote_nodemaps=NULL;
2898         struct ctdb_vnn_map *vnnmap=NULL;
2899         struct ctdb_vnn_map *remote_vnnmap=NULL;
2900         int32_t debug_level;
2901         int i, j, ret;
2902
2903
2904
2905         /* verify that the main daemon is still running */
2906         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2907                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2908                 exit(-1);
2909         }
2910
2911         /* ping the local daemon to tell it we are alive */
2912         ctdb_ctrl_recd_ping(ctdb);
2913
2914         if (rec->election_timeout) {
2915                 /* an election is in progress */
2916                 return;
2917         }
2918
2919         /* read the debug level from the parent and update locally */
2920         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2921         if (ret !=0) {
2922                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2923                 return;
2924         }
2925         LogLevel = debug_level;
2926
2927
2928         /* We must check if we need to ban a node here but we want to do this
2929            as early as possible so we dont wait until we have pulled the node
2930            map from the local node. thats why we have the hardcoded value 20
2931         */
2932         for (i=0; i<ctdb->num_nodes; i++) {
2933                 struct ctdb_banning_state *ban_state;
2934
2935                 if (ctdb->nodes[i]->ban_state == NULL) {
2936                         continue;
2937                 }
2938                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2939                 if (ban_state->count < 20) {
2940                         continue;
2941                 }
2942                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2943                         ctdb->nodes[i]->pnn, ban_state->count,
2944                         ctdb->tunable.recovery_ban_period));
2945                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2946                 ban_state->count = 0;
2947         }
2948
2949         /* get relevant tunables */
2950         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2951         if (ret != 0) {
2952                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2953                 return;
2954         }
2955
2956         /* get the current recovery lock file from the server */
2957         if (update_recovery_lock_file(ctdb) != 0) {
2958                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2959                 return;
2960         }
2961
2962         /* Make sure that if recovery lock verification becomes disabled when
2963            we close the file
2964         */
2965         if (ctdb->tunable.verify_recovery_lock == 0) {
2966                 if (ctdb->recovery_lock_fd != -1) {
2967                         close(ctdb->recovery_lock_fd);
2968                         ctdb->recovery_lock_fd = -1;
2969                 }
2970         }
2971
2972         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2973         if (pnn == (uint32_t)-1) {
2974                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2975                 return;
2976         }
2977
2978         /* get the vnnmap */
2979         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2980         if (ret != 0) {
2981                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2982                 return;
2983         }
2984
2985
2986         /* get number of nodes */
2987         if (rec->nodemap) {
2988                 talloc_free(rec->nodemap);
2989                 rec->nodemap = NULL;
2990                 nodemap=NULL;
2991         }
2992         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2993         if (ret != 0) {
2994                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2995                 return;
2996         }
2997         nodemap = rec->nodemap;
2998
2999         /* check which node is the recovery master */
3000         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3001         if (ret != 0) {
3002                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3003                 return;
3004         }
3005
3006         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3007         if (rec->recmaster != pnn) {
3008                 if (rec->ip_reallocate_ctx != NULL) {
3009                         talloc_free(rec->ip_reallocate_ctx);
3010                         rec->ip_reallocate_ctx = NULL;
3011                         rec->reallocate_callers = NULL;
3012                 }
3013         }
3014         /* if there are takeovers requested, perform it and notify the waiters */
3015         if (rec->reallocate_callers) {
3016                 process_ipreallocate_requests(ctdb, rec);
3017         }
3018
3019         if (rec->recmaster == (uint32_t)-1) {
3020                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3021                 force_election(rec, pnn, nodemap);
3022                 return;
3023         }
3024
3025
3026         /* if the local daemon is STOPPED, we verify that the databases are
3027            also frozen and thet the recmode is set to active 
3028         */
3029         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3030                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3031                 if (ret != 0) {
3032                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3033                 }
3034                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3035                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3036
3037                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3038                         if (ret != 0) {
3039                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3040                                 return;
3041                         }
3042                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3043                         if (ret != 0) {
3044                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3045
3046                                 return;
3047                         }
3048                         return;
3049                 }
3050         }
3051         /* If the local node is stopped, verify we are not the recmaster 
3052            and yield this role if so
3053         */
3054         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3055                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3056                 force_election(rec, pnn, nodemap);
3057                 return;
3058         }
3059         
3060         /* check that we (recovery daemon) and the local ctdb daemon
3061            agrees on whether we are banned or not
3062         */
3063 //qqq
3064
3065         /* remember our own node flags */
3066         rec->node_flags = nodemap->nodes[pnn].flags;
3067
3068         /* count how many active nodes there are */
3069         rec->num_active    = 0;
3070         rec->num_connected = 0;
3071         for (i=0; i<nodemap->num; i++) {
3072                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3073                         rec->num_active++;
3074                 }
3075                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3076                         rec->num_connected++;
3077                 }
3078         }
3079
3080
3081         /* verify that the recmaster node is still active */
3082         for (j=0; j<nodemap->num; j++) {
3083                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3084                         break;
3085                 }
3086         }
3087
3088         if (j == nodemap->num) {
3089                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3090                 force_election(rec, pnn, nodemap);
3091                 return;
3092         }
3093
3094         /* if recovery master is disconnected we must elect a new recmaster */
3095         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3096                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3097                 force_election(rec, pnn, nodemap);
3098                 return;
3099         }
3100
3101         /* grap the nodemap from the recovery master to check if it is banned */
3102         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3103                                    mem_ctx, &recmaster_nodemap);
3104         if (ret != 0) {
3105                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3106                           nodemap->nodes[j].pnn));
3107                 return;
3108         }
3109
3110
3111         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3112                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3113                 force_election(rec, pnn, nodemap);
3114                 return;
3115         }
3116
3117
3118         /* verify that we have all ip addresses we should have and we dont
3119          * have addresses we shouldnt have.
3120          */ 
3121         if (ctdb->tunable.disable_ip_failover == 0) {
3122                 if (rec->ip_check_disable_ctx == NULL) {
3123                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3124                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3125                         }
3126                 }
3127         }
3128
3129
3130         /* if we are not the recmaster then we do not need to check
3131            if recovery is needed
3132          */
3133         if (pnn != rec->recmaster) {
3134                 return;
3135         }
3136
3137
3138         /* ensure our local copies of flags are right */
3139         ret = update_local_flags(rec, nodemap);
3140         if (ret == MONITOR_ELECTION_NEEDED) {
3141                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3142                 force_election(rec, pnn, nodemap);
3143                 return;
3144         }
3145         if (ret != MONITOR_OK) {
3146                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3147                 return;
3148         }
3149
3150         if (ctdb->num_nodes != nodemap->num) {
3151                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3152                 reload_nodes_file(ctdb);
3153                 return;
3154         }
3155
3156         /* verify that all active nodes agree that we are the recmaster */
3157         switch (verify_recmaster(rec, nodemap, pnn)) {
3158         case MONITOR_RECOVERY_NEEDED:
3159                 /* can not happen */
3160                 return;
3161         case MONITOR_ELECTION_NEEDED:
3162                 force_election(rec, pnn, nodemap);
3163                 return;
3164         case MONITOR_OK:
3165                 break;
3166         case MONITOR_FAILED:
3167                 return;
3168         }
3169
3170
3171         if (rec->need_recovery) {
3172                 /* a previous recovery didn't finish */
3173                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3174                 return;
3175         }
3176
3177         /* verify that all active nodes are in normal mode 
3178            and not in recovery mode 
3179         */
3180         switch (verify_recmode(ctdb, nodemap)) {
3181         case MONITOR_RECOVERY_NEEDED:
3182                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3183                 return;
3184         case MONITOR_FAILED:
3185                 return;
3186         case MONITOR_ELECTION_NEEDED:
3187                 /* can not happen */
3188         case MONITOR_OK:
3189                 break;
3190         }
3191
3192
3193         if (ctdb->tunable.verify_recovery_lock != 0) {
3194                 /* we should have the reclock - check its not stale */
3195                 ret = check_recovery_lock(ctdb);
3196                 if (ret != 0) {
3197                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3198                         ctdb_set_culprit(rec, ctdb->pnn);
3199                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3200                         return;
3201                 }
3202         }
3203
3204         /* get the nodemap for all active remote nodes
3205          */
3206         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3207         if (remote_nodemaps == NULL) {
3208                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3209                 return;
3210         }
3211         for(i=0; i<nodemap->num; i++) {
3212                 remote_nodemaps[i] = NULL;
3213         }
3214         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3215                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3216                 return;
3217         } 
3218
3219         /* verify that all other nodes have the same nodemap as we have
3220         */
3221         for (j=0; j<nodemap->num; j++) {
3222                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3223                         continue;
3224                 }
3225
3226                 if (remote_nodemaps[j] == NULL) {
3227                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3228                         ctdb_set_culprit(rec, j);
3229
3230                         return;
3231                 }
3232
3233                 /* if the nodes disagree on how many nodes there are
3234                    then this is a good reason to try recovery
3235                  */
3236                 if (remote_nodemaps[j]->num != nodemap->num) {
3237                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3238                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3239                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3240                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3241                         return;
3242                 }
3243
3244                 /* if the nodes disagree on which nodes exist and are
3245                    active, then that is also a good reason to do recovery
3246                  */
3247                 for (i=0;i<nodemap->num;i++) {
3248                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3249                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3250                                           nodemap->nodes[j].pnn, i, 
3251                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3252                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3253                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3254                                             vnnmap);
3255                                 return;
3256                         }
3257                 }
3258
3259                 /* verify the flags are consistent
3260                 */
3261                 for (i=0; i<nodemap->num; i++) {
3262                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3263                                 continue;
3264                         }
3265                         
3266                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3267                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3268                                   nodemap->nodes[j].pnn, 
3269                                   nodemap->nodes[i].pnn, 
3270                                   remote_nodemaps[j]->nodes[i].flags,
3271                                   nodemap->nodes[j].flags));
3272                                 if (i == j) {
3273                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3274                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3275                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3276                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3277                                                     vnnmap);
3278                                         return;
3279                                 } else {
3280                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3281                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3282                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3283                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3284                                                     vnnmap);
3285                                         return;
3286                                 }
3287                         }
3288                 }
3289         }
3290
3291
3292         /* there better be the same number of lmasters in the vnn map
3293            as there are active nodes or we will have to do a recovery
3294          */
3295         if (vnnmap->size != rec->num_active) {
3296                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3297                           vnnmap->size, rec->num_active));
3298                 ctdb_set_culprit(rec, ctdb->pnn);
3299                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3300                 return;
3301         }
3302
3303         /* verify that all active nodes in the nodemap also exist in 
3304            the vnnmap.
3305          */
3306         for (j=0; j<nodemap->num; j++) {
3307                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3308                         continue;
3309                 }
3310                 if (nodemap->nodes[j].pnn == pnn) {
3311                         continue;
3312                 }
3313
3314                 for (i=0; i<vnnmap->size; i++) {
3315                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3316                                 break;
3317                         }
3318                 }
3319                 if (i == vnnmap->size) {
3320                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3321                                   nodemap->nodes[j].pnn));
3322                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3323                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3324                         return;
3325                 }
3326         }
3327
3328         
3329         /* verify that all other nodes have the same vnnmap
3330            and are from the same generation
3331          */
3332         for (j=0; j<nodemap->num; j++) {
3333                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3334                         continue;
3335                 }
3336                 if (nodemap->nodes[j].pnn == pnn) {
3337                         continue;
3338                 }
3339
3340                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3341                                           mem_ctx, &remote_vnnmap);
3342                 if (ret != 0) {
3343                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3344                                   nodemap->nodes[j].pnn));
3345                         return;
3346                 }
3347
3348                 /* verify the vnnmap generation is the same */
3349                 if (vnnmap->generation != remote_vnnmap->generation) {
3350                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3351                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3352                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3353                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3354                         return;
3355                 }
3356
3357                 /* verify the vnnmap size is the same */
3358                 if (vnnmap->size != remote_vnnmap->size) {
3359                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3360                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3361                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3362                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3363                         return;
3364                 }
3365
3366                 /* verify the vnnmap is the same */
3367                 for (i=0;i<vnnmap->size;i++) {
3368                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3369                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3370                                           nodemap->nodes[j].pnn));
3371                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3372                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3373                                             vnnmap);
3374                                 return;
3375                         }
3376                 }
3377         }
3378
3379         /* we might need to change who has what IP assigned */
3380         if (rec->need_takeover_run) {
3381                 uint32_t culprit = (uint32_t)-1;
3382
3383                 rec->need_takeover_run = false;
3384
3385                 /* update the list of public ips that a node can handle for
3386                    all connected nodes
3387                 */
3388                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3389                 if (ret != 0) {
3390                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3391                                          culprit));
3392                         ctdb_set_culprit(rec, culprit);
3393                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3394                         return;
3395                 }
3396
3397                 /* execute the "startrecovery" event script on all nodes */
3398                 ret = run_startrecovery_eventscript(rec, nodemap);
3399                 if (ret!=0) {
3400                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3401                         ctdb_set_culprit(rec, ctdb->pnn);
3402                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3403                         return;
3404                 }
3405
3406                 ret = ctdb_takeover_run(ctdb, nodemap);
3407                 if (ret != 0) {
3408                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3409                         ctdb_set_culprit(rec, ctdb->pnn);
3410                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3411                         return;
3412                 }
3413
3414                 /* execute the "recovered" event script on all nodes */
3415                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3416 #if 0
3417 // we cant check whether the event completed successfully
3418 // since this script WILL fail if the node is in recovery mode
3419 // and if that race happens, the code here would just cause a second
3420 // cascading recovery.
3421                 if (ret!=0) {
3422                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3423                         ctdb_set_culprit(rec, ctdb->pnn);
3424                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3425                 }
3426 #endif
3427         }
3428 }
3429
3430 /*
3431   the main monitoring loop
3432  */
3433 static void monitor_cluster(struct ctdb_context *ctdb)
3434 {
3435         struct ctdb_recoverd *rec;
3436
3437         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3438
3439         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3440         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3441
3442         rec->ctdb = ctdb;
3443
3444         rec->priority_time = timeval_current();
3445
3446         /* register a message port for sending memory dumps */
3447         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3448
3449         /* register a message port for recovery elections */
3450         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3451
3452         /* when nodes are disabled/enabled */
3453         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3454
3455         /* when we are asked to puch out a flag change */
3456         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3457
3458         /* register a message port for vacuum fetch */
3459         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3460
3461         /* register a message port for reloadnodes  */
3462         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3463
3464         /* register a message port for performing a takeover run */
3465         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3466
3467         /* register a message port for disabling the ip check for a short while */
3468         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3469
3470         /* register a message port for updating the recovery daemons node assignment for an ip */
3471         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3472
3473         for (;;) {
3474                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3475                 struct timeval start;
3476                 double elapsed;
3477
3478                 if (!mem_ctx) {
3479                         DEBUG(DEBUG_CRIT,(__location__
3480                                           " Failed to create temp context\n"));
3481                         exit(-1);
3482                 }
3483
3484                 start = timeval_current();
3485                 main_loop(ctdb, rec, mem_ctx);
3486                 talloc_free(mem_ctx);
3487
3488                 /* we only check for recovery once every second */
3489                 elapsed = timeval_elapsed(&start);
3490                 if (elapsed < ctdb->tunable.recover_interval) {
3491                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3492                                           - elapsed);
3493                 }
3494         }
3495 }
3496
3497 /*
3498   event handler for when the main ctdbd dies
3499  */
3500 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3501                                  uint16_t flags, void *private_data)
3502 {
3503         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3504         _exit(1);
3505 }
3506
3507 /*
3508   called regularly to verify that the recovery daemon is still running
3509  */
3510 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3511                               struct timeval yt, void *p)
3512 {
3513         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3514
3515         if (kill(ctdb->recoverd_pid, 0) != 0) {
3516                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3517
3518                 ctdb_stop_recoverd(ctdb);
3519                 ctdb_stop_keepalive(ctdb);
3520                 ctdb_stop_monitoring(ctdb);
3521                 ctdb_release_all_ips(ctdb);
3522                 if (ctdb->methods != NULL) {
3523                         ctdb->methods->shutdown(ctdb);
3524                 }
3525                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3526
3527                 exit(10);       
3528         }
3529
3530         event_add_timed(ctdb->ev, ctdb, 
3531                         timeval_current_ofs(30, 0),
3532                         ctdb_check_recd, ctdb);
3533 }
3534
3535 static void recd_sig_child_handler(struct event_context *ev,
3536         struct signal_event *se, int signum, int count,
3537         void *dont_care, 
3538         void *private_data)
3539 {
3540 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3541         int status;
3542         pid_t pid = -1;
3543
3544         while (pid != 0) {
3545                 pid = waitpid(-1, &status, WNOHANG);
3546                 if (pid == -1) {
3547                         if (errno != ECHILD) {
3548                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3549                         }
3550                         return;
3551                 }
3552                 if (pid > 0) {
3553                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3554                 }
3555         }
3556 }
3557
3558 /*
3559   startup the recovery daemon as a child of the main ctdb daemon
3560  */
3561 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3562 {
3563         int fd[2];
3564         struct signal_event *se;
3565         struct tevent_fd *fde;
3566
3567         if (pipe(fd) != 0) {
3568                 return -1;
3569         }
3570
3571         ctdb->ctdbd_pid = getpid();
3572
3573         ctdb->recoverd_pid = fork();
3574         if (ctdb->recoverd_pid == -1) {
3575                 return -1;
3576         }
3577         
3578         if (ctdb->recoverd_pid != 0) {
3579                 close(fd[0]);
3580                 event_add_timed(ctdb->ev, ctdb, 
3581                                 timeval_current_ofs(30, 0),
3582                                 ctdb_check_recd, ctdb);
3583                 return 0;
3584         }
3585
3586         close(fd[1]);
3587
3588         srandom(getpid() ^ time(NULL));
3589
3590         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3591                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3592                 exit(1);
3593         }
3594
3595         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3596
3597         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3598                      ctdb_recoverd_parent, &fd[0]);     
3599         tevent_fd_set_auto_close(fde);
3600
3601         /* set up a handler to pick up sigchld */
3602         se = event_add_signal(ctdb->ev, ctdb,
3603                                      SIGCHLD, 0,
3604                                      recd_sig_child_handler,
3605                                      ctdb);
3606         if (se == NULL) {
3607                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3608                 exit(1);
3609         }
3610
3611         monitor_cluster(ctdb);
3612
3613         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3614         return -1;
3615 }
3616
3617 /*
3618   shutdown the recovery daemon
3619  */
3620 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3621 {
3622         if (ctdb->recoverd_pid == 0) {
3623                 return;
3624         }
3625
3626         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3627         kill(ctdb->recoverd_pid, SIGTERM);
3628 }