Add ctdb_fork(0 which will fork a child process and drop the real-time
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
891 {
892         uint32_t timed_out = 0;
893         time_t usecs = (secs - (time_t)secs) * 1000000;
894         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
895         while (!timed_out) {
896                 event_loop_once(ctdb->ev);
897         }
898 }
899
900 /*
901   called when an election times out (ends)
902  */
903 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
904                                   struct timeval t, void *p)
905 {
906         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
907         rec->election_timeout = NULL;
908         fast_start = false;
909
910         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
911 }
912
913
914 /*
915   wait for an election to finish. It finished election_timeout seconds after
916   the last election packet is received
917  */
918 static void ctdb_wait_election(struct ctdb_recoverd *rec)
919 {
920         struct ctdb_context *ctdb = rec->ctdb;
921         while (rec->election_timeout) {
922                 event_loop_once(ctdb->ev);
923         }
924 }
925
926 /*
927   Update our local flags from all remote connected nodes. 
928   This is only run when we are or we belive we are the recovery master
929  */
930 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
931 {
932         int j;
933         struct ctdb_context *ctdb = rec->ctdb;
934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
935
936         /* get the nodemap for all active remote nodes and verify
937            they are the same as for this node
938          */
939         for (j=0; j<nodemap->num; j++) {
940                 struct ctdb_node_map *remote_nodemap=NULL;
941                 int ret;
942
943                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
944                         continue;
945                 }
946                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
947                         continue;
948                 }
949
950                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
951                                            mem_ctx, &remote_nodemap);
952                 if (ret != 0) {
953                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
954                                   nodemap->nodes[j].pnn));
955                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
956                         talloc_free(mem_ctx);
957                         return MONITOR_FAILED;
958                 }
959                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
960                         /* We should tell our daemon about this so it
961                            updates its flags or else we will log the same 
962                            message again in the next iteration of recovery.
963                            Since we are the recovery master we can just as
964                            well update the flags on all nodes.
965                         */
966                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
967                         if (ret != 0) {
968                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
969                                 return -1;
970                         }
971
972                         /* Update our local copy of the flags in the recovery
973                            daemon.
974                         */
975                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
976                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
977                                  nodemap->nodes[j].flags));
978                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
979                 }
980                 talloc_free(remote_nodemap);
981         }
982         talloc_free(mem_ctx);
983         return MONITOR_OK;
984 }
985
986
987 /* Create a new random generation ip. 
988    The generation id can not be the INVALID_GENERATION id
989 */
990 static uint32_t new_generation(void)
991 {
992         uint32_t generation;
993
994         while (1) {
995                 generation = random();
996
997                 if (generation != INVALID_GENERATION) {
998                         break;
999                 }
1000         }
1001
1002         return generation;
1003 }
1004
1005
1006 /*
1007   create a temporary working database
1008  */
1009 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1010 {
1011         char *name;
1012         struct tdb_wrap *recdb;
1013         unsigned tdb_flags;
1014
1015         /* open up the temporary recovery database */
1016         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1017                                ctdb->db_directory_state,
1018                                ctdb->pnn);
1019         if (name == NULL) {
1020                 return NULL;
1021         }
1022         unlink(name);
1023
1024         tdb_flags = TDB_NOLOCK;
1025         if (ctdb->valgrinding) {
1026                 tdb_flags |= TDB_NOMMAP;
1027         }
1028         tdb_flags |= TDB_DISALLOW_NESTING;
1029
1030         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1031                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1032         if (recdb == NULL) {
1033                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1034         }
1035
1036         talloc_free(name);
1037
1038         return recdb;
1039 }
1040
1041
1042 /* 
1043    a traverse function for pulling all relevent records from recdb
1044  */
1045 struct recdb_data {
1046         struct ctdb_context *ctdb;
1047         struct ctdb_marshall_buffer *recdata;
1048         uint32_t len;
1049         bool failed;
1050         bool persistent;
1051 };
1052
1053 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1054 {
1055         struct recdb_data *params = (struct recdb_data *)p;
1056         struct ctdb_rec_data *rec;
1057         struct ctdb_ltdb_header *hdr;
1058
1059         /* skip empty records */
1060         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1061                 return 0;
1062         }
1063
1064         /* update the dmaster field to point to us */
1065         hdr = (struct ctdb_ltdb_header *)data.dptr;
1066         if (!params->persistent) {
1067                 hdr->dmaster = params->ctdb->pnn;
1068         }
1069
1070         /* add the record to the blob ready to send to the nodes */
1071         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1072         if (rec == NULL) {
1073                 params->failed = true;
1074                 return -1;
1075         }
1076         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1077         if (params->recdata == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1079                          rec->length + params->len, params->recdata->count));
1080                 params->failed = true;
1081                 return -1;
1082         }
1083         params->recdata->count++;
1084         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1085         params->len += rec->length;
1086         talloc_free(rec);
1087
1088         return 0;
1089 }
1090
1091 /*
1092   push the recdb database out to all nodes
1093  */
1094 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1095                                bool persistent,
1096                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1097 {
1098         struct recdb_data params;
1099         struct ctdb_marshall_buffer *recdata;
1100         TDB_DATA outdata;
1101         TALLOC_CTX *tmp_ctx;
1102         uint32_t *nodes;
1103
1104         tmp_ctx = talloc_new(ctdb);
1105         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1106
1107         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1108         CTDB_NO_MEMORY(ctdb, recdata);
1109
1110         recdata->db_id = dbid;
1111
1112         params.ctdb = ctdb;
1113         params.recdata = recdata;
1114         params.len = offsetof(struct ctdb_marshall_buffer, data);
1115         params.failed = false;
1116         params.persistent = persistent;
1117
1118         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1120                 talloc_free(params.recdata);
1121                 talloc_free(tmp_ctx);
1122                 return -1;
1123         }
1124
1125         if (params.failed) {
1126                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1127                 talloc_free(params.recdata);
1128                 talloc_free(tmp_ctx);
1129                 return -1;              
1130         }
1131
1132         recdata = params.recdata;
1133
1134         outdata.dptr = (void *)recdata;
1135         outdata.dsize = params.len;
1136
1137         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1138         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1139                                         nodes, 0,
1140                                         CONTROL_TIMEOUT(), false, outdata,
1141                                         NULL, NULL,
1142                                         NULL) != 0) {
1143                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1144                 talloc_free(recdata);
1145                 talloc_free(tmp_ctx);
1146                 return -1;
1147         }
1148
1149         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1150                   dbid, recdata->count));
1151
1152         talloc_free(recdata);
1153         talloc_free(tmp_ctx);
1154
1155         return 0;
1156 }
1157
1158
1159 /*
1160   go through a full recovery on one database 
1161  */
1162 static int recover_database(struct ctdb_recoverd *rec, 
1163                             TALLOC_CTX *mem_ctx,
1164                             uint32_t dbid,
1165                             bool persistent,
1166                             uint32_t pnn, 
1167                             struct ctdb_node_map *nodemap,
1168                             uint32_t transaction_id)
1169 {
1170         struct tdb_wrap *recdb;
1171         int ret;
1172         struct ctdb_context *ctdb = rec->ctdb;
1173         TDB_DATA data;
1174         struct ctdb_control_wipe_database w;
1175         uint32_t *nodes;
1176
1177         recdb = create_recdb(ctdb, mem_ctx);
1178         if (recdb == NULL) {
1179                 return -1;
1180         }
1181
1182         /* pull all remote databases onto the recdb */
1183         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1184         if (ret != 0) {
1185                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1186                 return -1;
1187         }
1188
1189         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1190
1191         /* wipe all the remote databases. This is safe as we are in a transaction */
1192         w.db_id = dbid;
1193         w.transaction_id = transaction_id;
1194
1195         data.dptr = (void *)&w;
1196         data.dsize = sizeof(w);
1197
1198         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1200                                         nodes, 0,
1201                                         CONTROL_TIMEOUT(), false, data,
1202                                         NULL, NULL,
1203                                         NULL) != 0) {
1204                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1205                 talloc_free(recdb);
1206                 return -1;
1207         }
1208         
1209         /* push out the correct database. This sets the dmaster and skips 
1210            the empty records */
1211         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1212         if (ret != 0) {
1213                 talloc_free(recdb);
1214                 return -1;
1215         }
1216
1217         /* all done with this database */
1218         talloc_free(recdb);
1219
1220         return 0;
1221 }
1222
1223 /*
1224   reload the nodes file 
1225 */
1226 static void reload_nodes_file(struct ctdb_context *ctdb)
1227 {
1228         ctdb->nodes = NULL;
1229         ctdb_load_nodes_file(ctdb);
1230 }
1231
1232 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1233                                          struct ctdb_recoverd *rec,
1234                                          struct ctdb_node_map *nodemap,
1235                                          uint32_t *culprit)
1236 {
1237         int j;
1238         int ret;
1239
1240         if (ctdb->num_nodes != nodemap->num) {
1241                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1242                                   ctdb->num_nodes, nodemap->num));
1243                 if (culprit) {
1244                         *culprit = ctdb->pnn;
1245                 }
1246                 return -1;
1247         }
1248
1249         for (j=0; j<nodemap->num; j++) {
1250                 /* release any existing data */
1251                 if (ctdb->nodes[j]->known_public_ips) {
1252                         talloc_free(ctdb->nodes[j]->known_public_ips);
1253                         ctdb->nodes[j]->known_public_ips = NULL;
1254                 }
1255                 if (ctdb->nodes[j]->available_public_ips) {
1256                         talloc_free(ctdb->nodes[j]->available_public_ips);
1257                         ctdb->nodes[j]->available_public_ips = NULL;
1258                 }
1259
1260                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1261                         continue;
1262                 }
1263
1264                 /* grab a new shiny list of public ips from the node */
1265                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1266                                         CONTROL_TIMEOUT(),
1267                                         ctdb->nodes[j]->pnn,
1268                                         ctdb->nodes,
1269                                         0,
1270                                         &ctdb->nodes[j]->known_public_ips);
1271                 if (ret != 0) {
1272                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1273                                 ctdb->nodes[j]->pnn));
1274                         if (culprit) {
1275                                 *culprit = ctdb->nodes[j]->pnn;
1276                         }
1277                         return -1;
1278                 }
1279
1280                 if (ctdb->tunable.disable_ip_failover == 0) {
1281                         if (rec->ip_check_disable_ctx == NULL) {
1282                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1283                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1284                                         rec->need_takeover_run = true;
1285                                 }
1286                         }
1287                 }
1288
1289                 /* grab a new shiny list of public ips from the node */
1290                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1291                                         CONTROL_TIMEOUT(),
1292                                         ctdb->nodes[j]->pnn,
1293                                         ctdb->nodes,
1294                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1295                                         &ctdb->nodes[j]->available_public_ips);
1296                 if (ret != 0) {
1297                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1298                                 ctdb->nodes[j]->pnn));
1299                         if (culprit) {
1300                                 *culprit = ctdb->nodes[j]->pnn;
1301                         }
1302                         return -1;
1303                 }
1304         }
1305
1306         return 0;
1307 }
1308
1309 /* when we start a recovery, make sure all nodes use the same reclock file
1310    setting
1311 */
1312 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1313 {
1314         struct ctdb_context *ctdb = rec->ctdb;
1315         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1316         TDB_DATA data;
1317         uint32_t *nodes;
1318
1319         if (ctdb->recovery_lock_file == NULL) {
1320                 data.dptr  = NULL;
1321                 data.dsize = 0;
1322         } else {
1323                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1324                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1325         }
1326
1327         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1329                                         nodes, 0,
1330                                         CONTROL_TIMEOUT(),
1331                                         false, data,
1332                                         NULL, NULL,
1333                                         rec) != 0) {
1334                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1335                 talloc_free(tmp_ctx);
1336                 return -1;
1337         }
1338
1339         talloc_free(tmp_ctx);
1340         return 0;
1341 }
1342
1343
1344 /*
1345   we are the recmaster, and recovery is needed - start a recovery run
1346  */
1347 static int do_recovery(struct ctdb_recoverd *rec, 
1348                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1349                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1350 {
1351         struct ctdb_context *ctdb = rec->ctdb;
1352         int i, j, ret;
1353         uint32_t generation;
1354         struct ctdb_dbid_map *dbmap;
1355         TDB_DATA data;
1356         uint32_t *nodes;
1357         struct timeval start_time;
1358         uint32_t culprit = (uint32_t)-1;
1359
1360         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1361
1362         /* if recovery fails, force it again */
1363         rec->need_recovery = true;
1364
1365         for (i=0; i<ctdb->num_nodes; i++) {
1366                 struct ctdb_banning_state *ban_state;
1367
1368                 if (ctdb->nodes[i]->ban_state == NULL) {
1369                         continue;
1370                 }
1371                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1372                 if (ban_state->count < 2*ctdb->num_nodes) {
1373                         continue;
1374                 }
1375                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1376                         ctdb->nodes[i]->pnn, ban_state->count,
1377                         ctdb->tunable.recovery_ban_period));
1378                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1379                 ban_state->count = 0;
1380         }
1381
1382
1383         if (ctdb->tunable.verify_recovery_lock != 0) {
1384                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1385                 start_time = timeval_current();
1386                 if (!ctdb_recovery_lock(ctdb, true)) {
1387                         ctdb_set_culprit(rec, pnn);
1388                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1389                         return -1;
1390                 }
1391                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1392                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1393         }
1394
1395         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1396
1397         /* get a list of all databases */
1398         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1399         if (ret != 0) {
1400                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1401                 return -1;
1402         }
1403
1404         /* we do the db creation before we set the recovery mode, so the freeze happens
1405            on all databases we will be dealing with. */
1406
1407         /* verify that we have all the databases any other node has */
1408         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1409         if (ret != 0) {
1410                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1411                 return -1;
1412         }
1413
1414         /* verify that all other nodes have all our databases */
1415         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1416         if (ret != 0) {
1417                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1418                 return -1;
1419         }
1420         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1421
1422         /* update the database priority for all remote databases */
1423         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1424         if (ret != 0) {
1425                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1426         }
1427         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1428
1429
1430         /* update all other nodes to use the same setting for reclock files
1431            as the local recovery master.
1432         */
1433         sync_recovery_lock_file_across_cluster(rec);
1434
1435         /* set recovery mode to active on all nodes */
1436         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1437         if (ret != 0) {
1438                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1439                 return -1;
1440         }
1441
1442         /* execute the "startrecovery" event script on all nodes */
1443         ret = run_startrecovery_eventscript(rec, nodemap);
1444         if (ret!=0) {
1445                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1446                 return -1;
1447         }
1448
1449         /*
1450           update all nodes to have the same flags that we have
1451          */
1452         for (i=0;i<nodemap->num;i++) {
1453                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1454                         continue;
1455                 }
1456
1457                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1458                 if (ret != 0) {
1459                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1460                         return -1;
1461                 }
1462         }
1463
1464         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1465
1466         /* pick a new generation number */
1467         generation = new_generation();
1468
1469         /* change the vnnmap on this node to use the new generation 
1470            number but not on any other nodes.
1471            this guarantees that if we abort the recovery prematurely
1472            for some reason (a node stops responding?)
1473            that we can just return immediately and we will reenter
1474            recovery shortly again.
1475            I.e. we deliberately leave the cluster with an inconsistent
1476            generation id to allow us to abort recovery at any stage and
1477            just restart it from scratch.
1478          */
1479         vnnmap->generation = generation;
1480         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1481         if (ret != 0) {
1482                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1483                 return -1;
1484         }
1485
1486         data.dptr = (void *)&generation;
1487         data.dsize = sizeof(uint32_t);
1488
1489         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1490         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1491                                         nodes, 0,
1492                                         CONTROL_TIMEOUT(), false, data,
1493                                         NULL,
1494                                         transaction_start_fail_callback,
1495                                         rec) != 0) {
1496                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1497                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1498                                         nodes, 0,
1499                                         CONTROL_TIMEOUT(), false, tdb_null,
1500                                         NULL,
1501                                         NULL,
1502                                         NULL) != 0) {
1503                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1504                 }
1505                 return -1;
1506         }
1507
1508         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1509
1510         for (i=0;i<dbmap->num;i++) {
1511                 ret = recover_database(rec, mem_ctx,
1512                                        dbmap->dbs[i].dbid,
1513                                        dbmap->dbs[i].persistent,
1514                                        pnn, nodemap, generation);
1515                 if (ret != 0) {
1516                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1517                         return -1;
1518                 }
1519         }
1520
1521         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1522
1523         /* commit all the changes */
1524         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1525                                         nodes, 0,
1526                                         CONTROL_TIMEOUT(), false, data,
1527                                         NULL, NULL,
1528                                         NULL) != 0) {
1529                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1530                 return -1;
1531         }
1532
1533         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1534         
1535
1536         /* update the capabilities for all nodes */
1537         ret = update_capabilities(ctdb, nodemap);
1538         if (ret!=0) {
1539                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1540                 return -1;
1541         }
1542
1543         /* build a new vnn map with all the currently active and
1544            unbanned nodes */
1545         generation = new_generation();
1546         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1547         CTDB_NO_MEMORY(ctdb, vnnmap);
1548         vnnmap->generation = generation;
1549         vnnmap->size = 0;
1550         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1551         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1552         for (i=j=0;i<nodemap->num;i++) {
1553                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1554                         continue;
1555                 }
1556                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1557                         /* this node can not be an lmaster */
1558                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1559                         continue;
1560                 }
1561
1562                 vnnmap->size++;
1563                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1564                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1565                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1566
1567         }
1568         if (vnnmap->size == 0) {
1569                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1570                 vnnmap->size++;
1571                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1572                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1573                 vnnmap->map[0] = pnn;
1574         }       
1575
1576         /* update to the new vnnmap on all nodes */
1577         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1580                 return -1;
1581         }
1582
1583         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1584
1585         /* update recmaster to point to us for all nodes */
1586         ret = set_recovery_master(ctdb, nodemap, pnn);
1587         if (ret!=0) {
1588                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1589                 return -1;
1590         }
1591
1592         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1593
1594         /*
1595           update all nodes to have the same flags that we have
1596          */
1597         for (i=0;i<nodemap->num;i++) {
1598                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1599                         continue;
1600                 }
1601
1602                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1603                 if (ret != 0) {
1604                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1605                         return -1;
1606                 }
1607         }
1608
1609         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1610
1611         /* disable recovery mode */
1612         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1615                 return -1;
1616         }
1617
1618         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1619
1620         /*
1621           tell nodes to takeover their public IPs
1622          */
1623         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1624         if (ret != 0) {
1625                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1626                                  culprit));
1627                 return -1;
1628         }
1629         rec->need_takeover_run = false;
1630         ret = ctdb_takeover_run(ctdb, nodemap);
1631         if (ret != 0) {
1632                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1633                 return -1;
1634         }
1635         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1636
1637         /* execute the "recovered" event script on all nodes */
1638         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1639         if (ret!=0) {
1640                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1641                 return -1;
1642         }
1643
1644         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1645
1646         /* send a message to all clients telling them that the cluster 
1647            has been reconfigured */
1648         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1649
1650         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1651
1652         rec->need_recovery = false;
1653
1654         /* we managed to complete a full recovery, make sure to forgive
1655            any past sins by the nodes that could now participate in the
1656            recovery.
1657         */
1658         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1659         for (i=0;i<nodemap->num;i++) {
1660                 struct ctdb_banning_state *ban_state;
1661
1662                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1663                         continue;
1664                 }
1665
1666                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1667                 if (ban_state == NULL) {
1668                         continue;
1669                 }
1670
1671                 ban_state->count = 0;
1672         }
1673
1674
1675         /* We just finished a recovery successfully. 
1676            We now wait for rerecovery_timeout before we allow 
1677            another recovery to take place.
1678         */
1679         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1680         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1681         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1682
1683         return 0;
1684 }
1685
1686
1687 /*
1688   elections are won by first checking the number of connected nodes, then
1689   the priority time, then the pnn
1690  */
1691 struct election_message {
1692         uint32_t num_connected;
1693         struct timeval priority_time;
1694         uint32_t pnn;
1695         uint32_t node_flags;
1696 };
1697
1698 /*
1699   form this nodes election data
1700  */
1701 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1702 {
1703         int ret, i;
1704         struct ctdb_node_map *nodemap;
1705         struct ctdb_context *ctdb = rec->ctdb;
1706
1707         ZERO_STRUCTP(em);
1708
1709         em->pnn = rec->ctdb->pnn;
1710         em->priority_time = rec->priority_time;
1711
1712         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1713         if (ret != 0) {
1714                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1715                 return;
1716         }
1717
1718         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1719         em->node_flags = rec->node_flags;
1720
1721         for (i=0;i<nodemap->num;i++) {
1722                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1723                         em->num_connected++;
1724                 }
1725         }
1726
1727         /* we shouldnt try to win this election if we cant be a recmaster */
1728         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1729                 em->num_connected = 0;
1730                 em->priority_time = timeval_current();
1731         }
1732
1733         talloc_free(nodemap);
1734 }
1735
1736 /*
1737   see if the given election data wins
1738  */
1739 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1740 {
1741         struct election_message myem;
1742         int cmp = 0;
1743
1744         ctdb_election_data(rec, &myem);
1745
1746         /* we cant win if we dont have the recmaster capability */
1747         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1748                 return false;
1749         }
1750
1751         /* we cant win if we are banned */
1752         if (rec->node_flags & NODE_FLAGS_BANNED) {
1753                 return false;
1754         }       
1755
1756         /* we cant win if we are stopped */
1757         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1758                 return false;
1759         }       
1760
1761         /* we will automatically win if the other node is banned */
1762         if (em->node_flags & NODE_FLAGS_BANNED) {
1763                 return true;
1764         }
1765
1766         /* we will automatically win if the other node is banned */
1767         if (em->node_flags & NODE_FLAGS_STOPPED) {
1768                 return true;
1769         }
1770
1771         /* try to use the most connected node */
1772         if (cmp == 0) {
1773                 cmp = (int)myem.num_connected - (int)em->num_connected;
1774         }
1775
1776         /* then the longest running node */
1777         if (cmp == 0) {
1778                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1779         }
1780
1781         if (cmp == 0) {
1782                 cmp = (int)myem.pnn - (int)em->pnn;
1783         }
1784
1785         return cmp > 0;
1786 }
1787
1788 /*
1789   send out an election request
1790  */
1791 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1792 {
1793         int ret;
1794         TDB_DATA election_data;
1795         struct election_message emsg;
1796         uint64_t srvid;
1797         struct ctdb_context *ctdb = rec->ctdb;
1798
1799         srvid = CTDB_SRVID_RECOVERY;
1800
1801         ctdb_election_data(rec, &emsg);
1802
1803         election_data.dsize = sizeof(struct election_message);
1804         election_data.dptr  = (unsigned char *)&emsg;
1805
1806
1807         /* send an election message to all active nodes */
1808         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1809         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1810
1811
1812         /* A new node that is already frozen has entered the cluster.
1813            The existing nodes are not frozen and dont need to be frozen
1814            until the election has ended and we start the actual recovery
1815         */
1816         if (update_recmaster == true) {
1817                 /* first we assume we will win the election and set 
1818                    recoverymaster to be ourself on the current node
1819                  */
1820                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1821                 if (ret != 0) {
1822                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1823                         return -1;
1824                 }
1825         }
1826
1827
1828         return 0;
1829 }
1830
1831 /*
1832   this function will unban all nodes in the cluster
1833 */
1834 static void unban_all_nodes(struct ctdb_context *ctdb)
1835 {
1836         int ret, i;
1837         struct ctdb_node_map *nodemap;
1838         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1839         
1840         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1841         if (ret != 0) {
1842                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1843                 return;
1844         }
1845
1846         for (i=0;i<nodemap->num;i++) {
1847                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1848                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1849                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1850                 }
1851         }
1852
1853         talloc_free(tmp_ctx);
1854 }
1855
1856
1857 /*
1858   we think we are winning the election - send a broadcast election request
1859  */
1860 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1861 {
1862         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1863         int ret;
1864
1865         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1868         }
1869
1870         talloc_free(rec->send_election_te);
1871         rec->send_election_te = NULL;
1872 }
1873
1874 /*
1875   handler for memory dumps
1876 */
1877 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1878                              TDB_DATA data, void *private_data)
1879 {
1880         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1881         TDB_DATA *dump;
1882         int ret;
1883         struct rd_memdump_reply *rd;
1884
1885         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1886                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1887                 talloc_free(tmp_ctx);
1888                 return;
1889         }
1890         rd = (struct rd_memdump_reply *)data.dptr;
1891
1892         dump = talloc_zero(tmp_ctx, TDB_DATA);
1893         if (dump == NULL) {
1894                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1895                 talloc_free(tmp_ctx);
1896                 return;
1897         }
1898         ret = ctdb_dump_memory(ctdb, dump);
1899         if (ret != 0) {
1900                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1901                 talloc_free(tmp_ctx);
1902                 return;
1903         }
1904
1905 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1906
1907         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1908         if (ret != 0) {
1909                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1910                 talloc_free(tmp_ctx);
1911                 return;
1912         }
1913
1914         talloc_free(tmp_ctx);
1915 }
1916
1917 /*
1918   handler for reload_nodes
1919 */
1920 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1921                              TDB_DATA data, void *private_data)
1922 {
1923         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1924
1925         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1926
1927         reload_nodes_file(rec->ctdb);
1928 }
1929
1930
1931 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1932                               struct timeval yt, void *p)
1933 {
1934         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1935
1936         talloc_free(rec->ip_check_disable_ctx);
1937         rec->ip_check_disable_ctx = NULL;
1938 }
1939
1940
1941 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1942                              TDB_DATA data, void *private_data)
1943 {
1944         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1945         struct ctdb_public_ip *ip;
1946
1947         if (rec->recmaster != rec->ctdb->pnn) {
1948                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1949                 return;
1950         }
1951
1952         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1953                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1954                 return;
1955         }
1956
1957         ip = (struct ctdb_public_ip *)data.dptr;
1958
1959         update_ip_assignment_tree(rec->ctdb, ip);
1960 }
1961
1962
1963 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1964                              TDB_DATA data, void *private_data)
1965 {
1966         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1967         uint32_t timeout;
1968
1969         if (rec->ip_check_disable_ctx != NULL) {
1970                 talloc_free(rec->ip_check_disable_ctx);
1971                 rec->ip_check_disable_ctx = NULL;
1972         }
1973
1974         if (data.dsize != sizeof(uint32_t)) {
1975                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1976                                  "expexting %lu\n", (long unsigned)data.dsize,
1977                                  (long unsigned)sizeof(uint32_t)));
1978                 return;
1979         }
1980         if (data.dptr == NULL) {
1981                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1982                 return;
1983         }
1984
1985         timeout = *((uint32_t *)data.dptr);
1986         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1987
1988         rec->ip_check_disable_ctx = talloc_new(rec);
1989         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1990
1991         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1992 }
1993
1994
1995 /*
1996   handler for ip reallocate, just add it to the list of callers and 
1997   handle this later in the monitor_cluster loop so we do not recurse
1998   with other callers to takeover_run()
1999 */
2000 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2001                              TDB_DATA data, void *private_data)
2002 {
2003         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2004         struct ip_reallocate_list *caller;
2005
2006         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2007                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2008                 return;
2009         }
2010
2011         if (rec->ip_reallocate_ctx == NULL) {
2012                 rec->ip_reallocate_ctx = talloc_new(rec);
2013                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2014         }
2015
2016         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2017         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2018
2019         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2020         caller->next = rec->reallocate_callers;
2021         rec->reallocate_callers = caller;
2022
2023         return;
2024 }
2025
2026 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2027 {
2028         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2029         TDB_DATA result;
2030         int32_t ret;
2031         struct ip_reallocate_list *callers;
2032         uint32_t culprit;
2033
2034         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2035
2036         /* update the list of public ips that a node can handle for
2037            all connected nodes
2038         */
2039         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2040         if (ret != 0) {
2041                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2042                                  culprit));
2043                 rec->need_takeover_run = true;
2044         }
2045         if (ret == 0) {
2046                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2047                 if (ret != 0) {
2048                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2049                                          culprit));
2050                         rec->need_takeover_run = true;
2051                 }
2052         }
2053
2054         result.dsize = sizeof(int32_t);
2055         result.dptr  = (uint8_t *)&ret;
2056
2057         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2058
2059                 /* Someone that sent srvid==0 does not want a reply */
2060                 if (callers->rd->srvid == 0) {
2061                         continue;
2062                 }
2063                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2064                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2065                                   (unsigned long long)callers->rd->srvid));
2066                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2067                 if (ret != 0) {
2068                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2069                                          "message to %u:%llu\n",
2070                                          (unsigned)callers->rd->pnn,
2071                                          (unsigned long long)callers->rd->srvid));
2072                 }
2073         }
2074
2075         talloc_free(tmp_ctx);
2076         talloc_free(rec->ip_reallocate_ctx);
2077         rec->ip_reallocate_ctx = NULL;
2078         rec->reallocate_callers = NULL;
2079         
2080 }
2081
2082
2083 /*
2084   handler for recovery master elections
2085 */
2086 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2087                              TDB_DATA data, void *private_data)
2088 {
2089         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2090         int ret;
2091         struct election_message *em = (struct election_message *)data.dptr;
2092         TALLOC_CTX *mem_ctx;
2093
2094         /* we got an election packet - update the timeout for the election */
2095         talloc_free(rec->election_timeout);
2096         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2097                                                 fast_start ?
2098                                                 timeval_current_ofs(0, 500000) :
2099                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2100                                                 ctdb_election_timeout, rec);
2101
2102         mem_ctx = talloc_new(ctdb);
2103
2104         /* someone called an election. check their election data
2105            and if we disagree and we would rather be the elected node, 
2106            send a new election message to all other nodes
2107          */
2108         if (ctdb_election_win(rec, em)) {
2109                 if (!rec->send_election_te) {
2110                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2111                                                                 timeval_current_ofs(0, 500000),
2112                                                                 election_send_request, rec);
2113                 }
2114                 talloc_free(mem_ctx);
2115                 /*unban_all_nodes(ctdb);*/
2116                 return;
2117         }
2118         
2119         /* we didn't win */
2120         talloc_free(rec->send_election_te);
2121         rec->send_election_te = NULL;
2122
2123         if (ctdb->tunable.verify_recovery_lock != 0) {
2124                 /* release the recmaster lock */
2125                 if (em->pnn != ctdb->pnn &&
2126                     ctdb->recovery_lock_fd != -1) {
2127                         close(ctdb->recovery_lock_fd);
2128                         ctdb->recovery_lock_fd = -1;
2129                         unban_all_nodes(ctdb);
2130                 }
2131         }
2132
2133         /* ok, let that guy become recmaster then */
2134         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2135         if (ret != 0) {
2136                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2137                 talloc_free(mem_ctx);
2138                 return;
2139         }
2140
2141         talloc_free(mem_ctx);
2142         return;
2143 }
2144
2145
2146 /*
2147   force the start of the election process
2148  */
2149 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2150                            struct ctdb_node_map *nodemap)
2151 {
2152         int ret;
2153         struct ctdb_context *ctdb = rec->ctdb;
2154
2155         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2156
2157         /* set all nodes to recovery mode to stop all internode traffic */
2158         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2161                 return;
2162         }
2163
2164         talloc_free(rec->election_timeout);
2165         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2166                                                 fast_start ?
2167                                                 timeval_current_ofs(0, 500000) :
2168                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2169                                                 ctdb_election_timeout, rec);
2170
2171         ret = send_election_request(rec, pnn, true);
2172         if (ret!=0) {
2173                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2174                 return;
2175         }
2176
2177         /* wait for a few seconds to collect all responses */
2178         ctdb_wait_election(rec);
2179 }
2180
2181
2182
2183 /*
2184   handler for when a node changes its flags
2185 */
2186 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2187                             TDB_DATA data, void *private_data)
2188 {
2189         int ret;
2190         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2191         struct ctdb_node_map *nodemap=NULL;
2192         TALLOC_CTX *tmp_ctx;
2193         uint32_t changed_flags;
2194         int i;
2195         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2196         int disabled_flag_changed;
2197
2198         if (data.dsize != sizeof(*c)) {
2199                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2200                 return;
2201         }
2202
2203         tmp_ctx = talloc_new(ctdb);
2204         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2205
2206         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2207         if (ret != 0) {
2208                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2209                 talloc_free(tmp_ctx);
2210                 return;         
2211         }
2212
2213
2214         for (i=0;i<nodemap->num;i++) {
2215                 if (nodemap->nodes[i].pnn == c->pnn) break;
2216         }
2217
2218         if (i == nodemap->num) {
2219                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2220                 talloc_free(tmp_ctx);
2221                 return;
2222         }
2223
2224         changed_flags = c->old_flags ^ c->new_flags;
2225
2226         if (nodemap->nodes[i].flags != c->new_flags) {
2227                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2228         }
2229
2230         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2231
2232         nodemap->nodes[i].flags = c->new_flags;
2233
2234         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2235                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2236
2237         if (ret == 0) {
2238                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2239                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2240         }
2241         
2242         if (ret == 0 &&
2243             ctdb->recovery_master == ctdb->pnn &&
2244             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2245                 /* Only do the takeover run if the perm disabled or unhealthy
2246                    flags changed since these will cause an ip failover but not
2247                    a recovery.
2248                    If the node became disconnected or banned this will also
2249                    lead to an ip address failover but that is handled 
2250                    during recovery
2251                 */
2252                 if (disabled_flag_changed) {
2253                         rec->need_takeover_run = true;
2254                 }
2255         }
2256
2257         talloc_free(tmp_ctx);
2258 }
2259
2260 /*
2261   handler for when we need to push out flag changes ot all other nodes
2262 */
2263 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2264                             TDB_DATA data, void *private_data)
2265 {
2266         int ret;
2267         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2268         struct ctdb_node_map *nodemap=NULL;
2269         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2270         uint32_t recmaster;
2271         uint32_t *nodes;
2272
2273         /* find the recovery master */
2274         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2275         if (ret != 0) {
2276                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2277                 talloc_free(tmp_ctx);
2278                 return;
2279         }
2280
2281         /* read the node flags from the recmaster */
2282         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2283         if (ret != 0) {
2284                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2285                 talloc_free(tmp_ctx);
2286                 return;
2287         }
2288         if (c->pnn >= nodemap->num) {
2289                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2290                 talloc_free(tmp_ctx);
2291                 return;
2292         }
2293
2294         /* send the flags update to all connected nodes */
2295         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2296
2297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2298                                       nodes, 0, CONTROL_TIMEOUT(),
2299                                       false, data,
2300                                       NULL, NULL,
2301                                       NULL) != 0) {
2302                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2303
2304                 talloc_free(tmp_ctx);
2305                 return;
2306         }
2307
2308         talloc_free(tmp_ctx);
2309 }
2310
2311
2312 struct verify_recmode_normal_data {
2313         uint32_t count;
2314         enum monitor_result status;
2315 };
2316
2317 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2318 {
2319         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2320
2321
2322         /* one more node has responded with recmode data*/
2323         rmdata->count--;
2324
2325         /* if we failed to get the recmode, then return an error and let
2326            the main loop try again.
2327         */
2328         if (state->state != CTDB_CONTROL_DONE) {
2329                 if (rmdata->status == MONITOR_OK) {
2330                         rmdata->status = MONITOR_FAILED;
2331                 }
2332                 return;
2333         }
2334
2335         /* if we got a response, then the recmode will be stored in the
2336            status field
2337         */
2338         if (state->status != CTDB_RECOVERY_NORMAL) {
2339                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2340                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2341         }
2342
2343         return;
2344 }
2345
2346
2347 /* verify that all nodes are in normal recovery mode */
2348 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2349 {
2350         struct verify_recmode_normal_data *rmdata;
2351         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2352         struct ctdb_client_control_state *state;
2353         enum monitor_result status;
2354         int j;
2355         
2356         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2357         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2358         rmdata->count  = 0;
2359         rmdata->status = MONITOR_OK;
2360
2361         /* loop over all active nodes and send an async getrecmode call to 
2362            them*/
2363         for (j=0; j<nodemap->num; j++) {
2364                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2365                         continue;
2366                 }
2367                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2368                                         CONTROL_TIMEOUT(), 
2369                                         nodemap->nodes[j].pnn);
2370                 if (state == NULL) {
2371                         /* we failed to send the control, treat this as 
2372                            an error and try again next iteration
2373                         */                      
2374                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2375                         talloc_free(mem_ctx);
2376                         return MONITOR_FAILED;
2377                 }
2378
2379                 /* set up the callback functions */
2380                 state->async.fn = verify_recmode_normal_callback;
2381                 state->async.private_data = rmdata;
2382
2383                 /* one more control to wait for to complete */
2384                 rmdata->count++;
2385         }
2386
2387
2388         /* now wait for up to the maximum number of seconds allowed
2389            or until all nodes we expect a response from has replied
2390         */
2391         while (rmdata->count > 0) {
2392                 event_loop_once(ctdb->ev);
2393         }
2394
2395         status = rmdata->status;
2396         talloc_free(mem_ctx);
2397         return status;
2398 }
2399
2400
2401 struct verify_recmaster_data {
2402         struct ctdb_recoverd *rec;
2403         uint32_t count;
2404         uint32_t pnn;
2405         enum monitor_result status;
2406 };
2407
2408 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2409 {
2410         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2411
2412
2413         /* one more node has responded with recmaster data*/
2414         rmdata->count--;
2415
2416         /* if we failed to get the recmaster, then return an error and let
2417            the main loop try again.
2418         */
2419         if (state->state != CTDB_CONTROL_DONE) {
2420                 if (rmdata->status == MONITOR_OK) {
2421                         rmdata->status = MONITOR_FAILED;
2422                 }
2423                 return;
2424         }
2425
2426         /* if we got a response, then the recmaster will be stored in the
2427            status field
2428         */
2429         if (state->status != rmdata->pnn) {
2430                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2431                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2432                 rmdata->status = MONITOR_ELECTION_NEEDED;
2433         }
2434
2435         return;
2436 }
2437
2438
2439 /* verify that all nodes agree that we are the recmaster */
2440 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2441 {
2442         struct ctdb_context *ctdb = rec->ctdb;
2443         struct verify_recmaster_data *rmdata;
2444         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2445         struct ctdb_client_control_state *state;
2446         enum monitor_result status;
2447         int j;
2448         
2449         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2450         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2451         rmdata->rec    = rec;
2452         rmdata->count  = 0;
2453         rmdata->pnn    = pnn;
2454         rmdata->status = MONITOR_OK;
2455
2456         /* loop over all active nodes and send an async getrecmaster call to 
2457            them*/
2458         for (j=0; j<nodemap->num; j++) {
2459                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2460                         continue;
2461                 }
2462                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2463                                         CONTROL_TIMEOUT(),
2464                                         nodemap->nodes[j].pnn);
2465                 if (state == NULL) {
2466                         /* we failed to send the control, treat this as 
2467                            an error and try again next iteration
2468                         */                      
2469                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2470                         talloc_free(mem_ctx);
2471                         return MONITOR_FAILED;
2472                 }
2473
2474                 /* set up the callback functions */
2475                 state->async.fn = verify_recmaster_callback;
2476                 state->async.private_data = rmdata;
2477
2478                 /* one more control to wait for to complete */
2479                 rmdata->count++;
2480         }
2481
2482
2483         /* now wait for up to the maximum number of seconds allowed
2484            or until all nodes we expect a response from has replied
2485         */
2486         while (rmdata->count > 0) {
2487                 event_loop_once(ctdb->ev);
2488         }
2489
2490         status = rmdata->status;
2491         talloc_free(mem_ctx);
2492         return status;
2493 }
2494
2495
2496 /* called to check that the local allocation of public ip addresses is ok.
2497 */
2498 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2499 {
2500         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2501         struct ctdb_control_get_ifaces *ifaces = NULL;
2502         struct ctdb_all_public_ips *ips = NULL;
2503         struct ctdb_uptime *uptime1 = NULL;
2504         struct ctdb_uptime *uptime2 = NULL;
2505         int ret, j;
2506         bool need_iface_check = false;
2507         bool need_takeover_run = false;
2508
2509         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2510                                 CTDB_CURRENT_NODE, &uptime1);
2511         if (ret != 0) {
2512                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2513                 talloc_free(mem_ctx);
2514                 return -1;
2515         }
2516
2517
2518         /* read the interfaces from the local node */
2519         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2520         if (ret != 0) {
2521                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2522                 talloc_free(mem_ctx);
2523                 return -1;
2524         }
2525
2526         if (!rec->ifaces) {
2527                 need_iface_check = true;
2528         } else if (rec->ifaces->num != ifaces->num) {
2529                 need_iface_check = true;
2530         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2531                 need_iface_check = true;
2532         }
2533
2534         if (need_iface_check) {
2535                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2536                                      "local node %u - force takeover run\n",
2537                                      pnn));
2538                 need_takeover_run = true;
2539         }
2540
2541         /* read the ip allocation from the local node */
2542         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2545                 talloc_free(mem_ctx);
2546                 return -1;
2547         }
2548
2549         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2550                                 CTDB_CURRENT_NODE, &uptime2);
2551         if (ret != 0) {
2552                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2553                 talloc_free(mem_ctx);
2554                 return -1;
2555         }
2556
2557         /* skip the check if the startrecovery time has changed */
2558         if (timeval_compare(&uptime1->last_recovery_started,
2559                             &uptime2->last_recovery_started) != 0) {
2560                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2561                 talloc_free(mem_ctx);
2562                 return 0;
2563         }
2564
2565         /* skip the check if the endrecovery time has changed */
2566         if (timeval_compare(&uptime1->last_recovery_finished,
2567                             &uptime2->last_recovery_finished) != 0) {
2568                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2569                 talloc_free(mem_ctx);
2570                 return 0;
2571         }
2572
2573         /* skip the check if we have started but not finished recovery */
2574         if (timeval_compare(&uptime1->last_recovery_finished,
2575                             &uptime1->last_recovery_started) != 1) {
2576                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2577                 talloc_free(mem_ctx);
2578
2579                 return 0;
2580         }
2581
2582         talloc_free(rec->ifaces);
2583         rec->ifaces = talloc_steal(rec, ifaces);
2584
2585         /* verify that we have the ip addresses we should have
2586            and we dont have ones we shouldnt have.
2587            if we find an inconsistency we set recmode to
2588            active on the local node and wait for the recmaster
2589            to do a full blown recovery.
2590            also if the pnn is -1 and we are healthy and can host the ip
2591            we also request a ip reallocation.
2592         */
2593         if (ctdb->tunable.disable_ip_failover == 0) {
2594                 for (j=0; j<ips->num; j++) {
2595                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2596                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2597                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2598                                 need_takeover_run = true;
2599                         } else if (ips->ips[j].pnn == pnn) {
2600                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2601                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2602                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2603                                         need_takeover_run = true;
2604                                 }
2605                         } else {
2606                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2607                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2608                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2609                                         need_takeover_run = true;
2610                                 }
2611                         }
2612                 }
2613         }
2614
2615         if (need_takeover_run) {
2616                 struct takeover_run_reply rd;
2617                 TDB_DATA data;
2618
2619                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2620
2621                 rd.pnn = ctdb->pnn;
2622                 rd.srvid = 0;
2623                 data.dptr = (uint8_t *)&rd;
2624                 data.dsize = sizeof(rd);
2625
2626                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2627                 if (ret != 0) {
2628                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2629                 }
2630         }
2631         talloc_free(mem_ctx);
2632         return 0;
2633 }
2634
2635
2636 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2637 {
2638         struct ctdb_node_map **remote_nodemaps = callback_data;
2639
2640         if (node_pnn >= ctdb->num_nodes) {
2641                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2642                 return;
2643         }
2644
2645         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2646
2647 }
2648
2649 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2650         struct ctdb_node_map *nodemap,
2651         struct ctdb_node_map **remote_nodemaps)
2652 {
2653         uint32_t *nodes;
2654
2655         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2656         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2657                                         nodes, 0,
2658                                         CONTROL_TIMEOUT(), false, tdb_null,
2659                                         async_getnodemap_callback,
2660                                         NULL,
2661                                         remote_nodemaps) != 0) {
2662                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2663
2664                 return -1;
2665         }
2666
2667         return 0;
2668 }
2669
2670 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2671 struct ctdb_check_reclock_state {
2672         struct ctdb_context *ctdb;
2673         struct timeval start_time;
2674         int fd[2];
2675         pid_t child;
2676         struct timed_event *te;
2677         struct fd_event *fde;
2678         enum reclock_child_status status;
2679 };
2680
2681 /* when we free the reclock state we must kill any child process.
2682 */
2683 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2684 {
2685         struct ctdb_context *ctdb = state->ctdb;
2686
2687         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2688
2689         if (state->fd[0] != -1) {
2690                 close(state->fd[0]);
2691                 state->fd[0] = -1;
2692         }
2693         if (state->fd[1] != -1) {
2694                 close(state->fd[1]);
2695                 state->fd[1] = -1;
2696         }
2697         kill(state->child, SIGKILL);
2698         return 0;
2699 }
2700
2701 /*
2702   called if our check_reclock child times out. this would happen if
2703   i/o to the reclock file blocks.
2704  */
2705 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2706                                          struct timeval t, void *private_data)
2707 {
2708         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2709                                            struct ctdb_check_reclock_state);
2710
2711         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2712         state->status = RECLOCK_TIMEOUT;
2713 }
2714
2715 /* this is called when the child process has completed checking the reclock
2716    file and has written data back to us through the pipe.
2717 */
2718 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2719                              uint16_t flags, void *private_data)
2720 {
2721         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2722                                              struct ctdb_check_reclock_state);
2723         char c = 0;
2724         int ret;
2725
2726         /* we got a response from our child process so we can abort the
2727            timeout.
2728         */
2729         talloc_free(state->te);
2730         state->te = NULL;
2731
2732         ret = read(state->fd[0], &c, 1);
2733         if (ret != 1 || c != RECLOCK_OK) {
2734                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2735                 state->status = RECLOCK_FAILED;
2736
2737                 return;
2738         }
2739
2740         state->status = RECLOCK_OK;
2741         return;
2742 }
2743
2744 static int check_recovery_lock(struct ctdb_context *ctdb)
2745 {
2746         int ret;
2747         struct ctdb_check_reclock_state *state;
2748         pid_t parent = getpid();
2749
2750         if (ctdb->recovery_lock_fd == -1) {
2751                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2752                 return -1;
2753         }
2754
2755         state = talloc(ctdb, struct ctdb_check_reclock_state);
2756         CTDB_NO_MEMORY(ctdb, state);
2757
2758         state->ctdb = ctdb;
2759         state->start_time = timeval_current();
2760         state->status = RECLOCK_CHECKING;
2761         state->fd[0] = -1;
2762         state->fd[1] = -1;
2763
2764         ret = pipe(state->fd);
2765         if (ret != 0) {
2766                 talloc_free(state);
2767                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2768                 return -1;
2769         }
2770
2771         state->child = ctdb_fork(ctdb);
2772         if (state->child == (pid_t)-1) {
2773                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2774                 close(state->fd[0]);
2775                 state->fd[0] = -1;
2776                 close(state->fd[1]);
2777                 state->fd[1] = -1;
2778                 talloc_free(state);
2779                 return -1;
2780         }
2781
2782         if (state->child == 0) {
2783                 char cc = RECLOCK_OK;
2784                 close(state->fd[0]);
2785                 state->fd[0] = -1;
2786
2787                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2788                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2789                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2790                         cc = RECLOCK_FAILED;
2791                 }
2792
2793                 write(state->fd[1], &cc, 1);
2794                 /* make sure we die when our parent dies */
2795                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2796                         sleep(5);
2797                         write(state->fd[1], &cc, 1);
2798                 }
2799                 _exit(0);
2800         }
2801         close(state->fd[1]);
2802         state->fd[1] = -1;
2803         set_close_on_exec(state->fd[0]);
2804
2805         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2806
2807         talloc_set_destructor(state, check_reclock_destructor);
2808
2809         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2810                                     ctdb_check_reclock_timeout, state);
2811         if (state->te == NULL) {
2812                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2813                 talloc_free(state);
2814                 return -1;
2815         }
2816
2817         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2818                                 EVENT_FD_READ,
2819                                 reclock_child_handler,
2820                                 (void *)state);
2821
2822         if (state->fde == NULL) {
2823                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2824                 talloc_free(state);
2825                 return -1;
2826         }
2827         tevent_fd_set_auto_close(state->fde);
2828
2829         while (state->status == RECLOCK_CHECKING) {
2830                 event_loop_once(ctdb->ev);
2831         }
2832
2833         if (state->status == RECLOCK_FAILED) {
2834                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2835                 close(ctdb->recovery_lock_fd);
2836                 ctdb->recovery_lock_fd = -1;
2837                 talloc_free(state);
2838                 return -1;
2839         }
2840
2841         talloc_free(state);
2842         return 0;
2843 }
2844
2845 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2846 {
2847         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2848         const char *reclockfile;
2849
2850         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2851                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2852                 talloc_free(tmp_ctx);
2853                 return -1;      
2854         }
2855
2856         if (reclockfile == NULL) {
2857                 if (ctdb->recovery_lock_file != NULL) {
2858                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2859                         talloc_free(ctdb->recovery_lock_file);
2860                         ctdb->recovery_lock_file = NULL;
2861                         if (ctdb->recovery_lock_fd != -1) {
2862                                 close(ctdb->recovery_lock_fd);
2863                                 ctdb->recovery_lock_fd = -1;
2864                         }
2865                 }
2866                 ctdb->tunable.verify_recovery_lock = 0;
2867                 talloc_free(tmp_ctx);
2868                 return 0;
2869         }
2870
2871         if (ctdb->recovery_lock_file == NULL) {
2872                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2873                 if (ctdb->recovery_lock_fd != -1) {
2874                         close(ctdb->recovery_lock_fd);
2875                         ctdb->recovery_lock_fd = -1;
2876                 }
2877                 talloc_free(tmp_ctx);
2878                 return 0;
2879         }
2880
2881
2882         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2883                 talloc_free(tmp_ctx);
2884                 return 0;
2885         }
2886
2887         talloc_free(ctdb->recovery_lock_file);
2888         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2889         ctdb->tunable.verify_recovery_lock = 0;
2890         if (ctdb->recovery_lock_fd != -1) {
2891                 close(ctdb->recovery_lock_fd);
2892                 ctdb->recovery_lock_fd = -1;
2893         }
2894
2895         talloc_free(tmp_ctx);
2896         return 0;
2897 }
2898
2899 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2900                       TALLOC_CTX *mem_ctx)
2901 {
2902         uint32_t pnn;
2903         struct ctdb_node_map *nodemap=NULL;
2904         struct ctdb_node_map *recmaster_nodemap=NULL;
2905         struct ctdb_node_map **remote_nodemaps=NULL;
2906         struct ctdb_vnn_map *vnnmap=NULL;
2907         struct ctdb_vnn_map *remote_vnnmap=NULL;
2908         int32_t debug_level;
2909         int i, j, ret;
2910
2911
2912
2913         /* verify that the main daemon is still running */
2914         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2915                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2916                 exit(-1);
2917         }
2918
2919         /* ping the local daemon to tell it we are alive */
2920         ctdb_ctrl_recd_ping(ctdb);
2921
2922         if (rec->election_timeout) {
2923                 /* an election is in progress */
2924                 return;
2925         }
2926
2927         /* read the debug level from the parent and update locally */
2928         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2929         if (ret !=0) {
2930                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2931                 return;
2932         }
2933         LogLevel = debug_level;
2934
2935
2936         /* We must check if we need to ban a node here but we want to do this
2937            as early as possible so we dont wait until we have pulled the node
2938            map from the local node. thats why we have the hardcoded value 20
2939         */
2940         for (i=0; i<ctdb->num_nodes; i++) {
2941                 struct ctdb_banning_state *ban_state;
2942
2943                 if (ctdb->nodes[i]->ban_state == NULL) {
2944                         continue;
2945                 }
2946                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2947                 if (ban_state->count < 20) {
2948                         continue;
2949                 }
2950                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2951                         ctdb->nodes[i]->pnn, ban_state->count,
2952                         ctdb->tunable.recovery_ban_period));
2953                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2954                 ban_state->count = 0;
2955         }
2956
2957         /* get relevant tunables */
2958         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2959         if (ret != 0) {
2960                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2961                 return;
2962         }
2963
2964         /* get the current recovery lock file from the server */
2965         if (update_recovery_lock_file(ctdb) != 0) {
2966                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2967                 return;
2968         }
2969
2970         /* Make sure that if recovery lock verification becomes disabled when
2971            we close the file
2972         */
2973         if (ctdb->tunable.verify_recovery_lock == 0) {
2974                 if (ctdb->recovery_lock_fd != -1) {
2975                         close(ctdb->recovery_lock_fd);
2976                         ctdb->recovery_lock_fd = -1;
2977                 }
2978         }
2979
2980         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2981         if (pnn == (uint32_t)-1) {
2982                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2983                 return;
2984         }
2985
2986         /* get the vnnmap */
2987         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2988         if (ret != 0) {
2989                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2990                 return;
2991         }
2992
2993
2994         /* get number of nodes */
2995         if (rec->nodemap) {
2996                 talloc_free(rec->nodemap);
2997                 rec->nodemap = NULL;
2998                 nodemap=NULL;
2999         }
3000         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3001         if (ret != 0) {
3002                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3003                 return;
3004         }
3005         nodemap = rec->nodemap;
3006
3007         /* check which node is the recovery master */
3008         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3009         if (ret != 0) {
3010                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3011                 return;
3012         }
3013
3014         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3015         if (rec->recmaster != pnn) {
3016                 if (rec->ip_reallocate_ctx != NULL) {
3017                         talloc_free(rec->ip_reallocate_ctx);
3018                         rec->ip_reallocate_ctx = NULL;
3019                         rec->reallocate_callers = NULL;
3020                 }
3021         }
3022         /* if there are takeovers requested, perform it and notify the waiters */
3023         if (rec->reallocate_callers) {
3024                 process_ipreallocate_requests(ctdb, rec);
3025         }
3026
3027         if (rec->recmaster == (uint32_t)-1) {
3028                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3029                 force_election(rec, pnn, nodemap);
3030                 return;
3031         }
3032
3033
3034         /* if the local daemon is STOPPED, we verify that the databases are
3035            also frozen and thet the recmode is set to active 
3036         */
3037         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3038                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3039                 if (ret != 0) {
3040                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3041                 }
3042                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3043                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3044
3045                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3046                         if (ret != 0) {
3047                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3048                                 return;
3049                         }
3050                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3051                         if (ret != 0) {
3052                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3053
3054                                 return;
3055                         }
3056                         return;
3057                 }
3058         }
3059         /* If the local node is stopped, verify we are not the recmaster 
3060            and yield this role if so
3061         */
3062         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3063                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3064                 force_election(rec, pnn, nodemap);
3065                 return;
3066         }
3067         
3068         /* check that we (recovery daemon) and the local ctdb daemon
3069            agrees on whether we are banned or not
3070         */
3071 //qqq
3072
3073         /* remember our own node flags */
3074         rec->node_flags = nodemap->nodes[pnn].flags;
3075
3076         /* count how many active nodes there are */
3077         rec->num_active    = 0;
3078         rec->num_connected = 0;
3079         for (i=0; i<nodemap->num; i++) {
3080                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3081                         rec->num_active++;
3082                 }
3083                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3084                         rec->num_connected++;
3085                 }
3086         }
3087
3088
3089         /* verify that the recmaster node is still active */
3090         for (j=0; j<nodemap->num; j++) {
3091                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3092                         break;
3093                 }
3094         }
3095
3096         if (j == nodemap->num) {
3097                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3098                 force_election(rec, pnn, nodemap);
3099                 return;
3100         }
3101
3102         /* if recovery master is disconnected we must elect a new recmaster */
3103         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3104                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3105                 force_election(rec, pnn, nodemap);
3106                 return;
3107         }
3108
3109         /* grap the nodemap from the recovery master to check if it is banned */
3110         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3111                                    mem_ctx, &recmaster_nodemap);
3112         if (ret != 0) {
3113                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3114                           nodemap->nodes[j].pnn));
3115                 return;
3116         }
3117
3118
3119         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3120                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3121                 force_election(rec, pnn, nodemap);
3122                 return;
3123         }
3124
3125
3126         /* verify that we have all ip addresses we should have and we dont
3127          * have addresses we shouldnt have.
3128          */ 
3129         if (ctdb->tunable.disable_ip_failover == 0) {
3130                 if (rec->ip_check_disable_ctx == NULL) {
3131                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3132                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3133                         }
3134                 }
3135         }
3136
3137
3138         /* if we are not the recmaster then we do not need to check
3139            if recovery is needed
3140          */
3141         if (pnn != rec->recmaster) {
3142                 return;
3143         }
3144
3145
3146         /* ensure our local copies of flags are right */
3147         ret = update_local_flags(rec, nodemap);
3148         if (ret == MONITOR_ELECTION_NEEDED) {
3149                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3150                 force_election(rec, pnn, nodemap);
3151                 return;
3152         }
3153         if (ret != MONITOR_OK) {
3154                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3155                 return;
3156         }
3157
3158         if (ctdb->num_nodes != nodemap->num) {
3159                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3160                 reload_nodes_file(ctdb);
3161                 return;
3162         }
3163
3164         /* verify that all active nodes agree that we are the recmaster */
3165         switch (verify_recmaster(rec, nodemap, pnn)) {
3166         case MONITOR_RECOVERY_NEEDED:
3167                 /* can not happen */
3168                 return;
3169         case MONITOR_ELECTION_NEEDED:
3170                 force_election(rec, pnn, nodemap);
3171                 return;
3172         case MONITOR_OK:
3173                 break;
3174         case MONITOR_FAILED:
3175                 return;
3176         }
3177
3178
3179         if (rec->need_recovery) {
3180                 /* a previous recovery didn't finish */
3181                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3182                 return;
3183         }
3184
3185         /* verify that all active nodes are in normal mode 
3186            and not in recovery mode 
3187         */
3188         switch (verify_recmode(ctdb, nodemap)) {
3189         case MONITOR_RECOVERY_NEEDED:
3190                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3191                 return;
3192         case MONITOR_FAILED:
3193                 return;
3194         case MONITOR_ELECTION_NEEDED:
3195                 /* can not happen */
3196         case MONITOR_OK:
3197                 break;
3198         }
3199
3200
3201         if (ctdb->tunable.verify_recovery_lock != 0) {
3202                 /* we should have the reclock - check its not stale */
3203                 ret = check_recovery_lock(ctdb);
3204                 if (ret != 0) {
3205                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3206                         ctdb_set_culprit(rec, ctdb->pnn);
3207                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3208                         return;
3209                 }
3210         }
3211
3212         /* get the nodemap for all active remote nodes
3213          */
3214         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3215         if (remote_nodemaps == NULL) {
3216                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3217                 return;
3218         }
3219         for(i=0; i<nodemap->num; i++) {
3220                 remote_nodemaps[i] = NULL;
3221         }
3222         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3223                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3224                 return;
3225         } 
3226
3227         /* verify that all other nodes have the same nodemap as we have
3228         */
3229         for (j=0; j<nodemap->num; j++) {
3230                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3231                         continue;
3232                 }
3233
3234                 if (remote_nodemaps[j] == NULL) {
3235                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3236                         ctdb_set_culprit(rec, j);
3237
3238                         return;
3239                 }
3240
3241                 /* if the nodes disagree on how many nodes there are
3242                    then this is a good reason to try recovery
3243                  */
3244                 if (remote_nodemaps[j]->num != nodemap->num) {
3245                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3246                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3247                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3248                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3249                         return;
3250                 }
3251
3252                 /* if the nodes disagree on which nodes exist and are
3253                    active, then that is also a good reason to do recovery
3254                  */
3255                 for (i=0;i<nodemap->num;i++) {
3256                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3257                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3258                                           nodemap->nodes[j].pnn, i, 
3259                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3260                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3261                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3262                                             vnnmap);
3263                                 return;
3264                         }
3265                 }
3266
3267                 /* verify the flags are consistent
3268                 */
3269                 for (i=0; i<nodemap->num; i++) {
3270                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3271                                 continue;
3272                         }
3273                         
3274                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3275                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3276                                   nodemap->nodes[j].pnn, 
3277                                   nodemap->nodes[i].pnn, 
3278                                   remote_nodemaps[j]->nodes[i].flags,
3279                                   nodemap->nodes[j].flags));
3280                                 if (i == j) {
3281                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3282                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3283                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3284                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3285                                                     vnnmap);
3286                                         return;
3287                                 } else {
3288                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3289                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3290                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3291                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3292                                                     vnnmap);
3293                                         return;
3294                                 }
3295                         }
3296                 }
3297         }
3298
3299
3300         /* there better be the same number of lmasters in the vnn map
3301            as there are active nodes or we will have to do a recovery
3302          */
3303         if (vnnmap->size != rec->num_active) {
3304                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3305                           vnnmap->size, rec->num_active));
3306                 ctdb_set_culprit(rec, ctdb->pnn);
3307                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3308                 return;
3309         }
3310
3311         /* verify that all active nodes in the nodemap also exist in 
3312            the vnnmap.
3313          */
3314         for (j=0; j<nodemap->num; j++) {
3315                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3316                         continue;
3317                 }
3318                 if (nodemap->nodes[j].pnn == pnn) {
3319                         continue;
3320                 }
3321
3322                 for (i=0; i<vnnmap->size; i++) {
3323                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3324                                 break;
3325                         }
3326                 }
3327                 if (i == vnnmap->size) {
3328                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3329                                   nodemap->nodes[j].pnn));
3330                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3331                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3332                         return;
3333                 }
3334         }
3335
3336         
3337         /* verify that all other nodes have the same vnnmap
3338            and are from the same generation
3339          */
3340         for (j=0; j<nodemap->num; j++) {
3341                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3342                         continue;
3343                 }
3344                 if (nodemap->nodes[j].pnn == pnn) {
3345                         continue;
3346                 }
3347
3348                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3349                                           mem_ctx, &remote_vnnmap);
3350                 if (ret != 0) {
3351                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3352                                   nodemap->nodes[j].pnn));
3353                         return;
3354                 }
3355
3356                 /* verify the vnnmap generation is the same */
3357                 if (vnnmap->generation != remote_vnnmap->generation) {
3358                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3359                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3360                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3361                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3362                         return;
3363                 }
3364
3365                 /* verify the vnnmap size is the same */
3366                 if (vnnmap->size != remote_vnnmap->size) {
3367                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3368                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3369                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3370                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3371                         return;
3372                 }
3373
3374                 /* verify the vnnmap is the same */
3375                 for (i=0;i<vnnmap->size;i++) {
3376                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3377                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3378                                           nodemap->nodes[j].pnn));
3379                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3380                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3381                                             vnnmap);
3382                                 return;
3383                         }
3384                 }
3385         }
3386
3387         /* we might need to change who has what IP assigned */
3388         if (rec->need_takeover_run) {
3389                 uint32_t culprit = (uint32_t)-1;
3390
3391                 rec->need_takeover_run = false;
3392
3393                 /* update the list of public ips that a node can handle for
3394                    all connected nodes
3395                 */
3396                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3397                 if (ret != 0) {
3398                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3399                                          culprit));
3400                         ctdb_set_culprit(rec, culprit);
3401                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3402                         return;
3403                 }
3404
3405                 /* execute the "startrecovery" event script on all nodes */
3406                 ret = run_startrecovery_eventscript(rec, nodemap);
3407                 if (ret!=0) {
3408                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3409                         ctdb_set_culprit(rec, ctdb->pnn);
3410                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3411                         return;
3412                 }
3413
3414                 ret = ctdb_takeover_run(ctdb, nodemap);
3415                 if (ret != 0) {
3416                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3417                         ctdb_set_culprit(rec, ctdb->pnn);
3418                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3419                         return;
3420                 }
3421
3422                 /* execute the "recovered" event script on all nodes */
3423                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3424 #if 0
3425 // we cant check whether the event completed successfully
3426 // since this script WILL fail if the node is in recovery mode
3427 // and if that race happens, the code here would just cause a second
3428 // cascading recovery.
3429                 if (ret!=0) {
3430                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3431                         ctdb_set_culprit(rec, ctdb->pnn);
3432                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3433                 }
3434 #endif
3435         }
3436 }
3437
3438 /*
3439   the main monitoring loop
3440  */
3441 static void monitor_cluster(struct ctdb_context *ctdb)
3442 {
3443         struct ctdb_recoverd *rec;
3444
3445         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3446
3447         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3448         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3449
3450         rec->ctdb = ctdb;
3451
3452         rec->priority_time = timeval_current();
3453
3454         /* register a message port for sending memory dumps */
3455         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3456
3457         /* register a message port for recovery elections */
3458         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3459
3460         /* when nodes are disabled/enabled */
3461         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3462
3463         /* when we are asked to puch out a flag change */
3464         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3465
3466         /* register a message port for vacuum fetch */
3467         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3468
3469         /* register a message port for reloadnodes  */
3470         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3471
3472         /* register a message port for performing a takeover run */
3473         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3474
3475         /* register a message port for disabling the ip check for a short while */
3476         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3477
3478         /* register a message port for updating the recovery daemons node assignment for an ip */
3479         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3480
3481         for (;;) {
3482                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3483                 struct timeval start;
3484                 double elapsed;
3485
3486                 if (!mem_ctx) {
3487                         DEBUG(DEBUG_CRIT,(__location__
3488                                           " Failed to create temp context\n"));
3489                         exit(-1);
3490                 }
3491
3492                 start = timeval_current();
3493                 main_loop(ctdb, rec, mem_ctx);
3494                 talloc_free(mem_ctx);
3495
3496                 /* we only check for recovery once every second */
3497                 elapsed = timeval_elapsed(&start);
3498                 if (elapsed < ctdb->tunable.recover_interval) {
3499                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3500                                           - elapsed);
3501                 }
3502         }
3503 }
3504
3505 /*
3506   event handler for when the main ctdbd dies
3507  */
3508 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3509                                  uint16_t flags, void *private_data)
3510 {
3511         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3512         _exit(1);
3513 }
3514
3515 /*
3516   called regularly to verify that the recovery daemon is still running
3517  */
3518 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3519                               struct timeval yt, void *p)
3520 {
3521         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3522
3523         if (kill(ctdb->recoverd_pid, 0) != 0) {
3524                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3525
3526                 ctdb_stop_recoverd(ctdb);
3527                 ctdb_stop_keepalive(ctdb);
3528                 ctdb_stop_monitoring(ctdb);
3529                 ctdb_release_all_ips(ctdb);
3530                 if (ctdb->methods != NULL) {
3531                         ctdb->methods->shutdown(ctdb);
3532                 }
3533                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3534
3535                 exit(10);       
3536         }
3537
3538         event_add_timed(ctdb->ev, ctdb, 
3539                         timeval_current_ofs(30, 0),
3540                         ctdb_check_recd, ctdb);
3541 }
3542
3543 static void recd_sig_child_handler(struct event_context *ev,
3544         struct signal_event *se, int signum, int count,
3545         void *dont_care, 
3546         void *private_data)
3547 {
3548 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3549         int status;
3550         pid_t pid = -1;
3551
3552         while (pid != 0) {
3553                 pid = waitpid(-1, &status, WNOHANG);
3554                 if (pid == -1) {
3555                         if (errno != ECHILD) {
3556                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3557                         }
3558                         return;
3559                 }
3560                 if (pid > 0) {
3561                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3562                 }
3563         }
3564 }
3565
3566 /*
3567   startup the recovery daemon as a child of the main ctdb daemon
3568  */
3569 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3570 {
3571         int fd[2];
3572         struct signal_event *se;
3573         struct tevent_fd *fde;
3574
3575         if (pipe(fd) != 0) {
3576                 return -1;
3577         }
3578
3579         ctdb->ctdbd_pid = getpid();
3580
3581         ctdb->recoverd_pid = fork();
3582         if (ctdb->recoverd_pid == -1) {
3583                 return -1;
3584         }
3585         
3586         if (ctdb->recoverd_pid != 0) {
3587                 close(fd[0]);
3588                 event_add_timed(ctdb->ev, ctdb, 
3589                                 timeval_current_ofs(30, 0),
3590                                 ctdb_check_recd, ctdb);
3591                 return 0;
3592         }
3593
3594         close(fd[1]);
3595
3596         srandom(getpid() ^ time(NULL));
3597
3598         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3599                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3600                 exit(1);
3601         }
3602
3603         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3604
3605         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3606                      ctdb_recoverd_parent, &fd[0]);     
3607         tevent_fd_set_auto_close(fde);
3608
3609         /* set up a handler to pick up sigchld */
3610         se = event_add_signal(ctdb->ev, ctdb,
3611                                      SIGCHLD, 0,
3612                                      recd_sig_child_handler,
3613                                      ctdb);
3614         if (se == NULL) {
3615                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3616                 exit(1);
3617         }
3618
3619         monitor_cluster(ctdb);
3620
3621         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3622         return -1;
3623 }
3624
3625 /*
3626   shutdown the recovery daemon
3627  */
3628 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3629 {
3630         if (ctdb->recoverd_pid == 0) {
3631                 return;
3632         }
3633
3634         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3635         kill(ctdb->recoverd_pid, SIGTERM);
3636 }