client: New generic node listing function list_of_nodes()
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
74
75 /*
76   ban a node for a period of time
77  */
78 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
79 {
80         int ret;
81         struct ctdb_context *ctdb = rec->ctdb;
82         struct ctdb_ban_time bantime;
83        
84         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
85
86         if (!ctdb_validate_pnn(ctdb, pnn)) {
87                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
88                 return;
89         }
90
91         bantime.pnn  = pnn;
92         bantime.time = ban_time;
93
94         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
95         if (ret != 0) {
96                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
97                 return;
98         }
99
100 }
101
102 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
103
104
105 /*
106   run the "recovered" eventscript on all nodes
107  */
108 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
109 {
110         TALLOC_CTX *tmp_ctx;
111         uint32_t *nodes;
112
113         tmp_ctx = talloc_new(ctdb);
114         CTDB_NO_MEMORY(ctdb, tmp_ctx);
115
116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
118                                         nodes, 0,
119                                         CONTROL_TIMEOUT(), false, tdb_null,
120                                         NULL, NULL,
121                                         NULL) != 0) {
122                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
123
124                 talloc_free(tmp_ctx);
125                 return -1;
126         }
127
128         talloc_free(tmp_ctx);
129         return 0;
130 }
131
132 /*
133   remember the trouble maker
134  */
135 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
136 {
137         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
138         struct ctdb_banning_state *ban_state;
139
140         if (culprit > ctdb->num_nodes) {
141                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
142                 return;
143         }
144
145         if (ctdb->nodes[culprit]->ban_state == NULL) {
146                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
147                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
148
149                 
150         }
151         ban_state = ctdb->nodes[culprit]->ban_state;
152         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
153                 /* this was the first time in a long while this node
154                    misbehaved so we will forgive any old transgressions.
155                 */
156                 ban_state->count = 0;
157         }
158
159         ban_state->count += count;
160         ban_state->last_reported_time = timeval_current();
161         rec->last_culprit_node = culprit;
162 }
163
164 /*
165   remember the trouble maker
166  */
167 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
168 {
169         ctdb_set_culprit_count(rec, culprit, 1);
170 }
171
172
173 /* this callback is called for every node that failed to execute the
174    start recovery event
175 */
176 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
177 {
178         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
179
180         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
181
182         ctdb_set_culprit(rec, node_pnn);
183 }
184
185 /*
186   run the "startrecovery" eventscript on all nodes
187  */
188 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
189 {
190         TALLOC_CTX *tmp_ctx;
191         uint32_t *nodes;
192         struct ctdb_context *ctdb = rec->ctdb;
193
194         tmp_ctx = talloc_new(ctdb);
195         CTDB_NO_MEMORY(ctdb, tmp_ctx);
196
197         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
198         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
199                                         nodes, 0,
200                                         CONTROL_TIMEOUT(), false, tdb_null,
201                                         NULL,
202                                         startrecovery_fail_callback,
203                                         rec) != 0) {
204                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
205                 talloc_free(tmp_ctx);
206                 return -1;
207         }
208
209         talloc_free(tmp_ctx);
210         return 0;
211 }
212
213 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
214 {
215         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
216                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
217                 return;
218         }
219         if (node_pnn < ctdb->num_nodes) {
220                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
221         }
222 }
223
224 /*
225   update the node capabilities for all connected nodes
226  */
227 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
228 {
229         uint32_t *nodes;
230         TALLOC_CTX *tmp_ctx;
231
232         tmp_ctx = talloc_new(ctdb);
233         CTDB_NO_MEMORY(ctdb, tmp_ctx);
234
235         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
236         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
237                                         nodes, 0,
238                                         CONTROL_TIMEOUT(),
239                                         false, tdb_null,
240                                         async_getcap_callback, NULL,
241                                         NULL) != 0) {
242                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
243                 talloc_free(tmp_ctx);
244                 return -1;
245         }
246
247         talloc_free(tmp_ctx);
248         return 0;
249 }
250
251 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
256         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
257 }
258
259 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
260 {
261         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
262
263         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
264         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
265 }
266
267 /*
268   change recovery mode on all nodes
269  */
270 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
271 {
272         TDB_DATA data;
273         uint32_t *nodes;
274         TALLOC_CTX *tmp_ctx;
275
276         tmp_ctx = talloc_new(ctdb);
277         CTDB_NO_MEMORY(ctdb, tmp_ctx);
278
279         /* freeze all nodes */
280         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
281         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
282                 int i;
283
284                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
285                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
286                                                 nodes, i,
287                                                 CONTROL_TIMEOUT(),
288                                                 false, tdb_null,
289                                                 NULL,
290                                                 set_recmode_fail_callback,
291                                                 rec) != 0) {
292                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
293                                 talloc_free(tmp_ctx);
294                                 return -1;
295                         }
296                 }
297         }
298
299
300         data.dsize = sizeof(uint32_t);
301         data.dptr = (unsigned char *)&rec_mode;
302
303         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
304                                         nodes, 0,
305                                         CONTROL_TIMEOUT(),
306                                         false, data,
307                                         NULL, NULL,
308                                         NULL) != 0) {
309                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
310                 talloc_free(tmp_ctx);
311                 return -1;
312         }
313
314         talloc_free(tmp_ctx);
315         return 0;
316 }
317
318 /*
319   change recovery master on all node
320  */
321 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
322 {
323         TDB_DATA data;
324         TALLOC_CTX *tmp_ctx;
325         uint32_t *nodes;
326
327         tmp_ctx = talloc_new(ctdb);
328         CTDB_NO_MEMORY(ctdb, tmp_ctx);
329
330         data.dsize = sizeof(uint32_t);
331         data.dptr = (unsigned char *)&pnn;
332
333         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
335                                         nodes, 0,
336                                         CONTROL_TIMEOUT(), false, data,
337                                         NULL, NULL,
338                                         NULL) != 0) {
339                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
340                 talloc_free(tmp_ctx);
341                 return -1;
342         }
343
344         talloc_free(tmp_ctx);
345         return 0;
346 }
347
348 /* update all remote nodes to use the same db priority that we have
349    this can fail if the remove node has not yet been upgraded to 
350    support this function, so we always return success and never fail
351    a recovery if this call fails.
352 */
353 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
354         struct ctdb_node_map *nodemap, 
355         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
356 {
357         int db;
358         uint32_t *nodes;
359
360         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
361
362         /* step through all local databases */
363         for (db=0; db<dbmap->num;db++) {
364                 TDB_DATA data;
365                 struct ctdb_db_priority db_prio;
366                 int ret;
367
368                 db_prio.db_id     = dbmap->dbs[db].dbid;
369                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
370                 if (ret != 0) {
371                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
372                         continue;
373                 }
374
375                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
376
377                 data.dptr  = (uint8_t *)&db_prio;
378                 data.dsize = sizeof(db_prio);
379
380                 if (ctdb_client_async_control(ctdb,
381                                         CTDB_CONTROL_SET_DB_PRIORITY,
382                                         nodes, 0,
383                                         CONTROL_TIMEOUT(), false, data,
384                                         NULL, NULL,
385                                         NULL) != 0) {
386                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
387                 }
388         }
389
390         return 0;
391 }                       
392
393 /*
394   ensure all other nodes have attached to any databases that we have
395  */
396 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
397                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
398 {
399         int i, j, db, ret;
400         struct ctdb_dbid_map *remote_dbmap;
401
402         /* verify that all other nodes have all our databases */
403         for (j=0; j<nodemap->num; j++) {
404                 /* we dont need to ourself ourselves */
405                 if (nodemap->nodes[j].pnn == pnn) {
406                         continue;
407                 }
408                 /* dont check nodes that are unavailable */
409                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
410                         continue;
411                 }
412
413                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
414                                          mem_ctx, &remote_dbmap);
415                 if (ret != 0) {
416                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
417                         return -1;
418                 }
419
420                 /* step through all local databases */
421                 for (db=0; db<dbmap->num;db++) {
422                         const char *name;
423
424
425                         for (i=0;i<remote_dbmap->num;i++) {
426                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
427                                         break;
428                                 }
429                         }
430                         /* the remote node already have this database */
431                         if (i!=remote_dbmap->num) {
432                                 continue;
433                         }
434                         /* ok so we need to create this database */
435                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
436                                             mem_ctx, &name);
437                         if (ret != 0) {
438                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
439                                 return -1;
440                         }
441                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                            mem_ctx, name, dbmap->dbs[db].persistent);
443                         if (ret != 0) {
444                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
445                                 return -1;
446                         }
447                 }
448         }
449
450         return 0;
451 }
452
453
454 /*
455   ensure we are attached to any databases that anyone else is attached to
456  */
457 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
458                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int i, j, db, ret;
461         struct ctdb_dbid_map *remote_dbmap;
462
463         /* verify that we have all database any other node has */
464         for (j=0; j<nodemap->num; j++) {
465                 /* we dont need to ourself ourselves */
466                 if (nodemap->nodes[j].pnn == pnn) {
467                         continue;
468                 }
469                 /* dont check nodes that are unavailable */
470                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
471                         continue;
472                 }
473
474                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
475                                          mem_ctx, &remote_dbmap);
476                 if (ret != 0) {
477                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
478                         return -1;
479                 }
480
481                 /* step through all databases on the remote node */
482                 for (db=0; db<remote_dbmap->num;db++) {
483                         const char *name;
484
485                         for (i=0;i<(*dbmap)->num;i++) {
486                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
487                                         break;
488                                 }
489                         }
490                         /* we already have this db locally */
491                         if (i!=(*dbmap)->num) {
492                                 continue;
493                         }
494                         /* ok so we need to create this database and
495                            rebuild dbmap
496                          */
497                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
499                         if (ret != 0) {
500                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
501                                           nodemap->nodes[j].pnn));
502                                 return -1;
503                         }
504                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
505                                            remote_dbmap->dbs[db].persistent);
506                         if (ret != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
508                                 return -1;
509                         }
510                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
511                         if (ret != 0) {
512                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
513                                 return -1;
514                         }
515                 }
516         }
517
518         return 0;
519 }
520
521
522 /*
523   pull the remote database contents from one node into the recdb
524  */
525 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
526                                     struct tdb_wrap *recdb, uint32_t dbid,
527                                     bool persistent)
528 {
529         int ret;
530         TDB_DATA outdata;
531         struct ctdb_marshall_buffer *reply;
532         struct ctdb_rec_data *rec;
533         int i;
534         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
535
536         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
537                                CONTROL_TIMEOUT(), &outdata);
538         if (ret != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
540                 talloc_free(tmp_ctx);
541                 return -1;
542         }
543
544         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
545
546         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
547                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
548                 talloc_free(tmp_ctx);
549                 return -1;
550         }
551         
552         rec = (struct ctdb_rec_data *)&reply->data[0];
553         
554         for (i=0;
555              i<reply->count;
556              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
557                 TDB_DATA key, data;
558                 struct ctdb_ltdb_header *hdr;
559                 TDB_DATA existing;
560                 
561                 key.dptr = &rec->data[0];
562                 key.dsize = rec->keylen;
563                 data.dptr = &rec->data[key.dsize];
564                 data.dsize = rec->datalen;
565                 
566                 hdr = (struct ctdb_ltdb_header *)data.dptr;
567
568                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
569                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
570                         talloc_free(tmp_ctx);
571                         return -1;
572                 }
573
574                 /* fetch the existing record, if any */
575                 existing = tdb_fetch(recdb->tdb, key);
576                 
577                 if (existing.dptr != NULL) {
578                         struct ctdb_ltdb_header header;
579                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
580                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
581                                          (unsigned)existing.dsize, srcnode));
582                                 free(existing.dptr);
583                                 talloc_free(tmp_ctx);
584                                 return -1;
585                         }
586                         header = *(struct ctdb_ltdb_header *)existing.dptr;
587                         free(existing.dptr);
588                         if (!(header.rsn < hdr->rsn ||
589                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
590                                 continue;
591                         }
592                 }
593                 
594                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
595                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
596                         talloc_free(tmp_ctx);
597                         return -1;                              
598                 }
599         }
600
601         talloc_free(tmp_ctx);
602
603         return 0;
604 }
605
606 /*
607   pull all the remote database contents into the recdb
608  */
609 static int pull_remote_database(struct ctdb_context *ctdb,
610                                 struct ctdb_recoverd *rec, 
611                                 struct ctdb_node_map *nodemap, 
612                                 struct tdb_wrap *recdb, uint32_t dbid,
613                                 bool persistent)
614 {
615         int j;
616
617         /* pull all records from all other nodes across onto this node
618            (this merges based on rsn)
619         */
620         for (j=0; j<nodemap->num; j++) {
621                 /* dont merge from nodes that are unavailable */
622                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
623                         continue;
624                 }
625                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
626                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
627                                  nodemap->nodes[j].pnn));
628                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
629                         return -1;
630                 }
631         }
632         
633         return 0;
634 }
635
636
637 /*
638   update flags on all active nodes
639  */
640 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
641 {
642         int ret;
643
644         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
645                 if (ret != 0) {
646                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
647                 return -1;
648         }
649
650         return 0;
651 }
652
653 /*
654   ensure all nodes have the same vnnmap we do
655  */
656 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
657                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
658 {
659         int j, ret;
660
661         /* push the new vnn map out to all the nodes */
662         for (j=0; j<nodemap->num; j++) {
663                 /* dont push to nodes that are unavailable */
664                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
665                         continue;
666                 }
667
668                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
669                 if (ret != 0) {
670                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
671                         return -1;
672                 }
673         }
674
675         return 0;
676 }
677
678
679 struct vacuum_info {
680         struct vacuum_info *next, *prev;
681         struct ctdb_recoverd *rec;
682         uint32_t srcnode;
683         struct ctdb_db_context *ctdb_db;
684         struct ctdb_marshall_buffer *recs;
685         struct ctdb_rec_data *r;
686 };
687
688 static void vacuum_fetch_next(struct vacuum_info *v);
689
690 /*
691   called when a vacuum fetch has completed - just free it and do the next one
692  */
693 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
694 {
695         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
696         talloc_free(state);
697         vacuum_fetch_next(v);
698 }
699
700
701 /*
702   process the next element from the vacuum list
703 */
704 static void vacuum_fetch_next(struct vacuum_info *v)
705 {
706         struct ctdb_call call;
707         struct ctdb_rec_data *r;
708
709         while (v->recs->count) {
710                 struct ctdb_client_call_state *state;
711                 TDB_DATA data;
712                 struct ctdb_ltdb_header *hdr;
713
714                 ZERO_STRUCT(call);
715                 call.call_id = CTDB_NULL_FUNC;
716                 call.flags = CTDB_IMMEDIATE_MIGRATION;
717                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
718
719                 r = v->r;
720                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
721                 v->recs->count--;
722
723                 call.key.dptr = &r->data[0];
724                 call.key.dsize = r->keylen;
725
726                 /* ensure we don't block this daemon - just skip a record if we can't get
727                    the chainlock */
728                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
729                         continue;
730                 }
731
732                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
733                 if (data.dptr == NULL) {
734                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
735                         continue;
736                 }
737
738                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
739                         free(data.dptr);
740                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
741                         continue;
742                 }
743                 
744                 hdr = (struct ctdb_ltdb_header *)data.dptr;
745                 if (hdr->dmaster == v->rec->ctdb->pnn) {
746                         /* its already local */
747                         free(data.dptr);
748                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
749                         continue;
750                 }
751
752                 free(data.dptr);
753
754                 state = ctdb_call_send(v->ctdb_db, &call);
755                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
756                 if (state == NULL) {
757                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
758                         talloc_free(v);
759                         return;
760                 }
761                 state->async.fn = vacuum_fetch_callback;
762                 state->async.private_data = v;
763                 return;
764         }
765
766         talloc_free(v);
767 }
768
769
770 /*
771   destroy a vacuum info structure
772  */
773 static int vacuum_info_destructor(struct vacuum_info *v)
774 {
775         DLIST_REMOVE(v->rec->vacuum_info, v);
776         return 0;
777 }
778
779
780 /*
781   handler for vacuum fetch
782 */
783 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
784                                  TDB_DATA data, void *private_data)
785 {
786         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
787         struct ctdb_marshall_buffer *recs;
788         int ret, i;
789         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
790         const char *name;
791         struct ctdb_dbid_map *dbmap=NULL;
792         bool persistent = false;
793         struct ctdb_db_context *ctdb_db;
794         struct ctdb_rec_data *r;
795         uint32_t srcnode;
796         struct vacuum_info *v;
797
798         recs = (struct ctdb_marshall_buffer *)data.dptr;
799         r = (struct ctdb_rec_data *)&recs->data[0];
800
801         if (recs->count == 0) {
802                 talloc_free(tmp_ctx);
803                 return;
804         }
805
806         srcnode = r->reqid;
807
808         for (v=rec->vacuum_info;v;v=v->next) {
809                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
810                         /* we're already working on records from this node */
811                         talloc_free(tmp_ctx);
812                         return;
813                 }
814         }
815
816         /* work out if the database is persistent */
817         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
818         if (ret != 0) {
819                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
820                 talloc_free(tmp_ctx);
821                 return;
822         }
823
824         for (i=0;i<dbmap->num;i++) {
825                 if (dbmap->dbs[i].dbid == recs->db_id) {
826                         persistent = dbmap->dbs[i].persistent;
827                         break;
828                 }
829         }
830         if (i == dbmap->num) {
831                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
832                 talloc_free(tmp_ctx);
833                 return;         
834         }
835
836         /* find the name of this database */
837         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
838                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
839                 talloc_free(tmp_ctx);
840                 return;
841         }
842
843         /* attach to it */
844         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
845         if (ctdb_db == NULL) {
846                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
847                 talloc_free(tmp_ctx);
848                 return;
849         }
850
851         v = talloc_zero(rec, struct vacuum_info);
852         if (v == NULL) {
853                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
854                 talloc_free(tmp_ctx);
855                 return;
856         }
857
858         v->rec = rec;
859         v->srcnode = srcnode;
860         v->ctdb_db = ctdb_db;
861         v->recs = talloc_memdup(v, recs, data.dsize);
862         if (v->recs == NULL) {
863                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
864                 talloc_free(v);
865                 talloc_free(tmp_ctx);
866                 return;         
867         }
868         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
869
870         DLIST_ADD(rec->vacuum_info, v);
871
872         talloc_set_destructor(v, vacuum_info_destructor);
873
874         vacuum_fetch_next(v);
875         talloc_free(tmp_ctx);
876 }
877
878
879 /*
880   called when ctdb_wait_timeout should finish
881  */
882 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
883                               struct timeval yt, void *p)
884 {
885         uint32_t *timed_out = (uint32_t *)p;
886         (*timed_out) = 1;
887 }
888
889 /*
890   wait for a given number of seconds
891  */
892 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
893 {
894         uint32_t timed_out = 0;
895         time_t usecs = (secs - (time_t)secs) * 1000000;
896         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
897         while (!timed_out) {
898                 event_loop_once(ctdb->ev);
899         }
900 }
901
902 /*
903   called when an election times out (ends)
904  */
905 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
906                                   struct timeval t, void *p)
907 {
908         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
909         rec->election_timeout = NULL;
910         fast_start = false;
911
912         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
913 }
914
915
916 /*
917   wait for an election to finish. It finished election_timeout seconds after
918   the last election packet is received
919  */
920 static void ctdb_wait_election(struct ctdb_recoverd *rec)
921 {
922         struct ctdb_context *ctdb = rec->ctdb;
923         while (rec->election_timeout) {
924                 event_loop_once(ctdb->ev);
925         }
926 }
927
928 /*
929   Update our local flags from all remote connected nodes. 
930   This is only run when we are or we belive we are the recovery master
931  */
932 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
933 {
934         int j;
935         struct ctdb_context *ctdb = rec->ctdb;
936         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
937
938         /* get the nodemap for all active remote nodes and verify
939            they are the same as for this node
940          */
941         for (j=0; j<nodemap->num; j++) {
942                 struct ctdb_node_map *remote_nodemap=NULL;
943                 int ret;
944
945                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
946                         continue;
947                 }
948                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
949                         continue;
950                 }
951
952                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
953                                            mem_ctx, &remote_nodemap);
954                 if (ret != 0) {
955                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
956                                   nodemap->nodes[j].pnn));
957                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
958                         talloc_free(mem_ctx);
959                         return MONITOR_FAILED;
960                 }
961                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
962                         /* We should tell our daemon about this so it
963                            updates its flags or else we will log the same 
964                            message again in the next iteration of recovery.
965                            Since we are the recovery master we can just as
966                            well update the flags on all nodes.
967                         */
968                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
969                         if (ret != 0) {
970                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
971                                 return -1;
972                         }
973
974                         /* Update our local copy of the flags in the recovery
975                            daemon.
976                         */
977                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
978                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
979                                  nodemap->nodes[j].flags));
980                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
981                 }
982                 talloc_free(remote_nodemap);
983         }
984         talloc_free(mem_ctx);
985         return MONITOR_OK;
986 }
987
988
989 /* Create a new random generation ip. 
990    The generation id can not be the INVALID_GENERATION id
991 */
992 static uint32_t new_generation(void)
993 {
994         uint32_t generation;
995
996         while (1) {
997                 generation = random();
998
999                 if (generation != INVALID_GENERATION) {
1000                         break;
1001                 }
1002         }
1003
1004         return generation;
1005 }
1006
1007
1008 /*
1009   create a temporary working database
1010  */
1011 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1012 {
1013         char *name;
1014         struct tdb_wrap *recdb;
1015         unsigned tdb_flags;
1016
1017         /* open up the temporary recovery database */
1018         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1019                                ctdb->db_directory_state,
1020                                ctdb->pnn);
1021         if (name == NULL) {
1022                 return NULL;
1023         }
1024         unlink(name);
1025
1026         tdb_flags = TDB_NOLOCK;
1027         if (ctdb->valgrinding) {
1028                 tdb_flags |= TDB_NOMMAP;
1029         }
1030         tdb_flags |= TDB_DISALLOW_NESTING;
1031
1032         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1033                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1034         if (recdb == NULL) {
1035                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1036         }
1037
1038         talloc_free(name);
1039
1040         return recdb;
1041 }
1042
1043
1044 /* 
1045    a traverse function for pulling all relevent records from recdb
1046  */
1047 struct recdb_data {
1048         struct ctdb_context *ctdb;
1049         struct ctdb_marshall_buffer *recdata;
1050         uint32_t len;
1051         bool failed;
1052         bool persistent;
1053 };
1054
1055 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1056 {
1057         struct recdb_data *params = (struct recdb_data *)p;
1058         struct ctdb_rec_data *rec;
1059         struct ctdb_ltdb_header *hdr;
1060
1061         /* skip empty records */
1062         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1063                 return 0;
1064         }
1065
1066         /* update the dmaster field to point to us */
1067         hdr = (struct ctdb_ltdb_header *)data.dptr;
1068         if (!params->persistent) {
1069                 hdr->dmaster = params->ctdb->pnn;
1070                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1071         }
1072
1073         /* add the record to the blob ready to send to the nodes */
1074         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1075         if (rec == NULL) {
1076                 params->failed = true;
1077                 return -1;
1078         }
1079         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1080         if (params->recdata == NULL) {
1081                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1082                          rec->length + params->len, params->recdata->count));
1083                 params->failed = true;
1084                 return -1;
1085         }
1086         params->recdata->count++;
1087         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1088         params->len += rec->length;
1089         talloc_free(rec);
1090
1091         return 0;
1092 }
1093
1094 /*
1095   push the recdb database out to all nodes
1096  */
1097 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1098                                bool persistent,
1099                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1100 {
1101         struct recdb_data params;
1102         struct ctdb_marshall_buffer *recdata;
1103         TDB_DATA outdata;
1104         TALLOC_CTX *tmp_ctx;
1105         uint32_t *nodes;
1106
1107         tmp_ctx = talloc_new(ctdb);
1108         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1109
1110         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1111         CTDB_NO_MEMORY(ctdb, recdata);
1112
1113         recdata->db_id = dbid;
1114
1115         params.ctdb = ctdb;
1116         params.recdata = recdata;
1117         params.len = offsetof(struct ctdb_marshall_buffer, data);
1118         params.failed = false;
1119         params.persistent = persistent;
1120
1121         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1122                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1123                 talloc_free(params.recdata);
1124                 talloc_free(tmp_ctx);
1125                 return -1;
1126         }
1127
1128         if (params.failed) {
1129                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1130                 talloc_free(params.recdata);
1131                 talloc_free(tmp_ctx);
1132                 return -1;              
1133         }
1134
1135         recdata = params.recdata;
1136
1137         outdata.dptr = (void *)recdata;
1138         outdata.dsize = params.len;
1139
1140         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1141         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1142                                         nodes, 0,
1143                                         CONTROL_TIMEOUT(), false, outdata,
1144                                         NULL, NULL,
1145                                         NULL) != 0) {
1146                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1147                 talloc_free(recdata);
1148                 talloc_free(tmp_ctx);
1149                 return -1;
1150         }
1151
1152         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1153                   dbid, recdata->count));
1154
1155         talloc_free(recdata);
1156         talloc_free(tmp_ctx);
1157
1158         return 0;
1159 }
1160
1161
1162 /*
1163   go through a full recovery on one database 
1164  */
1165 static int recover_database(struct ctdb_recoverd *rec, 
1166                             TALLOC_CTX *mem_ctx,
1167                             uint32_t dbid,
1168                             bool persistent,
1169                             uint32_t pnn, 
1170                             struct ctdb_node_map *nodemap,
1171                             uint32_t transaction_id)
1172 {
1173         struct tdb_wrap *recdb;
1174         int ret;
1175         struct ctdb_context *ctdb = rec->ctdb;
1176         TDB_DATA data;
1177         struct ctdb_control_wipe_database w;
1178         uint32_t *nodes;
1179
1180         recdb = create_recdb(ctdb, mem_ctx);
1181         if (recdb == NULL) {
1182                 return -1;
1183         }
1184
1185         /* pull all remote databases onto the recdb */
1186         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1187         if (ret != 0) {
1188                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1189                 return -1;
1190         }
1191
1192         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1193
1194         /* wipe all the remote databases. This is safe as we are in a transaction */
1195         w.db_id = dbid;
1196         w.transaction_id = transaction_id;
1197
1198         data.dptr = (void *)&w;
1199         data.dsize = sizeof(w);
1200
1201         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1202         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1203                                         nodes, 0,
1204                                         CONTROL_TIMEOUT(), false, data,
1205                                         NULL, NULL,
1206                                         NULL) != 0) {
1207                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1208                 talloc_free(recdb);
1209                 return -1;
1210         }
1211         
1212         /* push out the correct database. This sets the dmaster and skips 
1213            the empty records */
1214         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1215         if (ret != 0) {
1216                 talloc_free(recdb);
1217                 return -1;
1218         }
1219
1220         /* all done with this database */
1221         talloc_free(recdb);
1222
1223         return 0;
1224 }
1225
1226 /*
1227   reload the nodes file 
1228 */
1229 static void reload_nodes_file(struct ctdb_context *ctdb)
1230 {
1231         ctdb->nodes = NULL;
1232         ctdb_load_nodes_file(ctdb);
1233 }
1234
1235 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1236                                          struct ctdb_recoverd *rec,
1237                                          struct ctdb_node_map *nodemap,
1238                                          uint32_t *culprit)
1239 {
1240         int j;
1241         int ret;
1242
1243         if (ctdb->num_nodes != nodemap->num) {
1244                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1245                                   ctdb->num_nodes, nodemap->num));
1246                 if (culprit) {
1247                         *culprit = ctdb->pnn;
1248                 }
1249                 return -1;
1250         }
1251
1252         for (j=0; j<nodemap->num; j++) {
1253                 /* release any existing data */
1254                 if (ctdb->nodes[j]->known_public_ips) {
1255                         talloc_free(ctdb->nodes[j]->known_public_ips);
1256                         ctdb->nodes[j]->known_public_ips = NULL;
1257                 }
1258                 if (ctdb->nodes[j]->available_public_ips) {
1259                         talloc_free(ctdb->nodes[j]->available_public_ips);
1260                         ctdb->nodes[j]->available_public_ips = NULL;
1261                 }
1262
1263                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1264                         continue;
1265                 }
1266
1267                 /* grab a new shiny list of public ips from the node */
1268                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1269                                         CONTROL_TIMEOUT(),
1270                                         ctdb->nodes[j]->pnn,
1271                                         ctdb->nodes,
1272                                         0,
1273                                         &ctdb->nodes[j]->known_public_ips);
1274                 if (ret != 0) {
1275                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1276                                 ctdb->nodes[j]->pnn));
1277                         if (culprit) {
1278                                 *culprit = ctdb->nodes[j]->pnn;
1279                         }
1280                         return -1;
1281                 }
1282
1283                 if (ctdb->tunable.disable_ip_failover == 0) {
1284                         if (rec->ip_check_disable_ctx == NULL) {
1285                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1286                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1287                                         rec->need_takeover_run = true;
1288                                 }
1289                         }
1290                 }
1291
1292                 /* grab a new shiny list of public ips from the node */
1293                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1294                                         CONTROL_TIMEOUT(),
1295                                         ctdb->nodes[j]->pnn,
1296                                         ctdb->nodes,
1297                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1298                                         &ctdb->nodes[j]->available_public_ips);
1299                 if (ret != 0) {
1300                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1301                                 ctdb->nodes[j]->pnn));
1302                         if (culprit) {
1303                                 *culprit = ctdb->nodes[j]->pnn;
1304                         }
1305                         return -1;
1306                 }
1307         }
1308
1309         return 0;
1310 }
1311
1312 /* when we start a recovery, make sure all nodes use the same reclock file
1313    setting
1314 */
1315 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1316 {
1317         struct ctdb_context *ctdb = rec->ctdb;
1318         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1319         TDB_DATA data;
1320         uint32_t *nodes;
1321
1322         if (ctdb->recovery_lock_file == NULL) {
1323                 data.dptr  = NULL;
1324                 data.dsize = 0;
1325         } else {
1326                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1327                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1328         }
1329
1330         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1332                                         nodes, 0,
1333                                         CONTROL_TIMEOUT(),
1334                                         false, data,
1335                                         NULL, NULL,
1336                                         rec) != 0) {
1337                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1338                 talloc_free(tmp_ctx);
1339                 return -1;
1340         }
1341
1342         talloc_free(tmp_ctx);
1343         return 0;
1344 }
1345
1346
1347 /*
1348  * this callback is called for every node that failed to execute ctdb_takeover_run()
1349  * and set flag to re-run takeover run.
1350  */
1351 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1352 {
1353         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1354
1355         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the takeover run. Setting it as recovery fail culprit\n", node_pnn));
1356
1357         ctdb_set_culprit(rec, node_pnn);
1358         rec->need_takeover_run = true;
1359 }
1360
1361
1362 /*
1363   we are the recmaster, and recovery is needed - start a recovery run
1364  */
1365 static int do_recovery(struct ctdb_recoverd *rec, 
1366                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1367                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1368 {
1369         struct ctdb_context *ctdb = rec->ctdb;
1370         int i, j, ret;
1371         uint32_t generation;
1372         struct ctdb_dbid_map *dbmap;
1373         TDB_DATA data;
1374         uint32_t *nodes;
1375         struct timeval start_time;
1376         uint32_t culprit = (uint32_t)-1;
1377
1378         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1379
1380         /* if recovery fails, force it again */
1381         rec->need_recovery = true;
1382
1383         for (i=0; i<ctdb->num_nodes; i++) {
1384                 struct ctdb_banning_state *ban_state;
1385
1386                 if (ctdb->nodes[i]->ban_state == NULL) {
1387                         continue;
1388                 }
1389                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1390                 if (ban_state->count < 2*ctdb->num_nodes) {
1391                         continue;
1392                 }
1393                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1394                         ctdb->nodes[i]->pnn, ban_state->count,
1395                         ctdb->tunable.recovery_ban_period));
1396                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1397                 ban_state->count = 0;
1398         }
1399
1400
1401         if (ctdb->tunable.verify_recovery_lock != 0) {
1402                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1403                 start_time = timeval_current();
1404                 if (!ctdb_recovery_lock(ctdb, true)) {
1405                         ctdb_set_culprit(rec, pnn);
1406                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1407                         return -1;
1408                 }
1409                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1410                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1411         }
1412
1413         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1414
1415         /* get a list of all databases */
1416         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1417         if (ret != 0) {
1418                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1419                 return -1;
1420         }
1421
1422         /* we do the db creation before we set the recovery mode, so the freeze happens
1423            on all databases we will be dealing with. */
1424
1425         /* verify that we have all the databases any other node has */
1426         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1427         if (ret != 0) {
1428                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1429                 return -1;
1430         }
1431
1432         /* verify that all other nodes have all our databases */
1433         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1434         if (ret != 0) {
1435                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1436                 return -1;
1437         }
1438         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1439
1440         /* update the database priority for all remote databases */
1441         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1442         if (ret != 0) {
1443                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1444         }
1445         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1446
1447
1448         /* update all other nodes to use the same setting for reclock files
1449            as the local recovery master.
1450         */
1451         sync_recovery_lock_file_across_cluster(rec);
1452
1453         /* set recovery mode to active on all nodes */
1454         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1455         if (ret != 0) {
1456                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1457                 return -1;
1458         }
1459
1460         /* execute the "startrecovery" event script on all nodes */
1461         ret = run_startrecovery_eventscript(rec, nodemap);
1462         if (ret!=0) {
1463                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1464                 return -1;
1465         }
1466
1467         /*
1468           update all nodes to have the same flags that we have
1469          */
1470         for (i=0;i<nodemap->num;i++) {
1471                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1472                         continue;
1473                 }
1474
1475                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1476                 if (ret != 0) {
1477                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1478                         return -1;
1479                 }
1480         }
1481
1482         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1483
1484         /* pick a new generation number */
1485         generation = new_generation();
1486
1487         /* change the vnnmap on this node to use the new generation 
1488            number but not on any other nodes.
1489            this guarantees that if we abort the recovery prematurely
1490            for some reason (a node stops responding?)
1491            that we can just return immediately and we will reenter
1492            recovery shortly again.
1493            I.e. we deliberately leave the cluster with an inconsistent
1494            generation id to allow us to abort recovery at any stage and
1495            just restart it from scratch.
1496          */
1497         vnnmap->generation = generation;
1498         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1499         if (ret != 0) {
1500                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1501                 return -1;
1502         }
1503
1504         data.dptr = (void *)&generation;
1505         data.dsize = sizeof(uint32_t);
1506
1507         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1508         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1509                                         nodes, 0,
1510                                         CONTROL_TIMEOUT(), false, data,
1511                                         NULL,
1512                                         transaction_start_fail_callback,
1513                                         rec) != 0) {
1514                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1515                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1516                                         nodes, 0,
1517                                         CONTROL_TIMEOUT(), false, tdb_null,
1518                                         NULL,
1519                                         NULL,
1520                                         NULL) != 0) {
1521                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1522                 }
1523                 return -1;
1524         }
1525
1526         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1527
1528         for (i=0;i<dbmap->num;i++) {
1529                 ret = recover_database(rec, mem_ctx,
1530                                        dbmap->dbs[i].dbid,
1531                                        dbmap->dbs[i].persistent,
1532                                        pnn, nodemap, generation);
1533                 if (ret != 0) {
1534                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1535                         return -1;
1536                 }
1537         }
1538
1539         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1540
1541         /* commit all the changes */
1542         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1543                                         nodes, 0,
1544                                         CONTROL_TIMEOUT(), false, data,
1545                                         NULL, NULL,
1546                                         NULL) != 0) {
1547                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1548                 return -1;
1549         }
1550
1551         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1552         
1553
1554         /* update the capabilities for all nodes */
1555         ret = update_capabilities(ctdb, nodemap);
1556         if (ret!=0) {
1557                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1558                 return -1;
1559         }
1560
1561         /* build a new vnn map with all the currently active and
1562            unbanned nodes */
1563         generation = new_generation();
1564         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1565         CTDB_NO_MEMORY(ctdb, vnnmap);
1566         vnnmap->generation = generation;
1567         vnnmap->size = 0;
1568         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1569         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1570         for (i=j=0;i<nodemap->num;i++) {
1571                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1572                         continue;
1573                 }
1574                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1575                         /* this node can not be an lmaster */
1576                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1577                         continue;
1578                 }
1579
1580                 vnnmap->size++;
1581                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1582                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1583                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1584
1585         }
1586         if (vnnmap->size == 0) {
1587                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1588                 vnnmap->size++;
1589                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1590                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1591                 vnnmap->map[0] = pnn;
1592         }       
1593
1594         /* update to the new vnnmap on all nodes */
1595         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1596         if (ret != 0) {
1597                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1598                 return -1;
1599         }
1600
1601         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1602
1603         /* update recmaster to point to us for all nodes */
1604         ret = set_recovery_master(ctdb, nodemap, pnn);
1605         if (ret!=0) {
1606                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1607                 return -1;
1608         }
1609
1610         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1611
1612         /*
1613           update all nodes to have the same flags that we have
1614          */
1615         for (i=0;i<nodemap->num;i++) {
1616                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1617                         continue;
1618                 }
1619
1620                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1621                 if (ret != 0) {
1622                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1623                         return -1;
1624                 }
1625         }
1626
1627         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1628
1629         /* disable recovery mode */
1630         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1631         if (ret != 0) {
1632                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1633                 return -1;
1634         }
1635
1636         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1637
1638         /*
1639           tell nodes to takeover their public IPs
1640          */
1641         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1642         if (ret != 0) {
1643                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1644                                  culprit));
1645                 rec->need_takeover_run = true;
1646                 return -1;
1647         }
1648         rec->need_takeover_run = false;
1649         ret = ctdb_takeover_run(ctdb, nodemap, NULL, NULL);
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1652                 rec->need_takeover_run = true;
1653         }
1654
1655         /* execute the "recovered" event script on all nodes */
1656         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1657         if (ret!=0) {
1658                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1659                 return -1;
1660         }
1661
1662         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1663
1664         /* send a message to all clients telling them that the cluster 
1665            has been reconfigured */
1666         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1667
1668         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1669
1670         rec->need_recovery = false;
1671
1672         /* we managed to complete a full recovery, make sure to forgive
1673            any past sins by the nodes that could now participate in the
1674            recovery.
1675         */
1676         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1677         for (i=0;i<nodemap->num;i++) {
1678                 struct ctdb_banning_state *ban_state;
1679
1680                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1681                         continue;
1682                 }
1683
1684                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1685                 if (ban_state == NULL) {
1686                         continue;
1687                 }
1688
1689                 ban_state->count = 0;
1690         }
1691
1692
1693         /* We just finished a recovery successfully. 
1694            We now wait for rerecovery_timeout before we allow 
1695            another recovery to take place.
1696         */
1697         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1698         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1699         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1700
1701         return 0;
1702 }
1703
1704
1705 /*
1706   elections are won by first checking the number of connected nodes, then
1707   the priority time, then the pnn
1708  */
1709 struct election_message {
1710         uint32_t num_connected;
1711         struct timeval priority_time;
1712         uint32_t pnn;
1713         uint32_t node_flags;
1714 };
1715
1716 /*
1717   form this nodes election data
1718  */
1719 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1720 {
1721         int ret, i;
1722         struct ctdb_node_map *nodemap;
1723         struct ctdb_context *ctdb = rec->ctdb;
1724
1725         ZERO_STRUCTP(em);
1726
1727         em->pnn = rec->ctdb->pnn;
1728         em->priority_time = rec->priority_time;
1729
1730         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1731         if (ret != 0) {
1732                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1733                 return;
1734         }
1735
1736         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1737         em->node_flags = rec->node_flags;
1738
1739         for (i=0;i<nodemap->num;i++) {
1740                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1741                         em->num_connected++;
1742                 }
1743         }
1744
1745         /* we shouldnt try to win this election if we cant be a recmaster */
1746         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1747                 em->num_connected = 0;
1748                 em->priority_time = timeval_current();
1749         }
1750
1751         talloc_free(nodemap);
1752 }
1753
1754 /*
1755   see if the given election data wins
1756  */
1757 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1758 {
1759         struct election_message myem;
1760         int cmp = 0;
1761
1762         ctdb_election_data(rec, &myem);
1763
1764         /* we cant win if we dont have the recmaster capability */
1765         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1766                 return false;
1767         }
1768
1769         /* we cant win if we are banned */
1770         if (rec->node_flags & NODE_FLAGS_BANNED) {
1771                 return false;
1772         }       
1773
1774         /* we cant win if we are stopped */
1775         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1776                 return false;
1777         }       
1778
1779         /* we will automatically win if the other node is banned */
1780         if (em->node_flags & NODE_FLAGS_BANNED) {
1781                 return true;
1782         }
1783
1784         /* we will automatically win if the other node is banned */
1785         if (em->node_flags & NODE_FLAGS_STOPPED) {
1786                 return true;
1787         }
1788
1789         /* try to use the most connected node */
1790         if (cmp == 0) {
1791                 cmp = (int)myem.num_connected - (int)em->num_connected;
1792         }
1793
1794         /* then the longest running node */
1795         if (cmp == 0) {
1796                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1797         }
1798
1799         if (cmp == 0) {
1800                 cmp = (int)myem.pnn - (int)em->pnn;
1801         }
1802
1803         return cmp > 0;
1804 }
1805
1806 /*
1807   send out an election request
1808  */
1809 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1810 {
1811         int ret;
1812         TDB_DATA election_data;
1813         struct election_message emsg;
1814         uint64_t srvid;
1815         struct ctdb_context *ctdb = rec->ctdb;
1816
1817         srvid = CTDB_SRVID_RECOVERY;
1818
1819         ctdb_election_data(rec, &emsg);
1820
1821         election_data.dsize = sizeof(struct election_message);
1822         election_data.dptr  = (unsigned char *)&emsg;
1823
1824
1825         /* send an election message to all active nodes */
1826         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1827         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1828
1829
1830         /* A new node that is already frozen has entered the cluster.
1831            The existing nodes are not frozen and dont need to be frozen
1832            until the election has ended and we start the actual recovery
1833         */
1834         if (update_recmaster == true) {
1835                 /* first we assume we will win the election and set 
1836                    recoverymaster to be ourself on the current node
1837                  */
1838                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1839                 if (ret != 0) {
1840                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1841                         return -1;
1842                 }
1843         }
1844
1845
1846         return 0;
1847 }
1848
1849 /*
1850   this function will unban all nodes in the cluster
1851 */
1852 static void unban_all_nodes(struct ctdb_context *ctdb)
1853 {
1854         int ret, i;
1855         struct ctdb_node_map *nodemap;
1856         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1857         
1858         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1859         if (ret != 0) {
1860                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1861                 return;
1862         }
1863
1864         for (i=0;i<nodemap->num;i++) {
1865                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1866                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1867                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1868                 }
1869         }
1870
1871         talloc_free(tmp_ctx);
1872 }
1873
1874
1875 /*
1876   we think we are winning the election - send a broadcast election request
1877  */
1878 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1879 {
1880         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1881         int ret;
1882
1883         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1884         if (ret != 0) {
1885                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1886         }
1887
1888         talloc_free(rec->send_election_te);
1889         rec->send_election_te = NULL;
1890 }
1891
1892 /*
1893   handler for memory dumps
1894 */
1895 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1896                              TDB_DATA data, void *private_data)
1897 {
1898         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1899         TDB_DATA *dump;
1900         int ret;
1901         struct rd_memdump_reply *rd;
1902
1903         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1904                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1905                 talloc_free(tmp_ctx);
1906                 return;
1907         }
1908         rd = (struct rd_memdump_reply *)data.dptr;
1909
1910         dump = talloc_zero(tmp_ctx, TDB_DATA);
1911         if (dump == NULL) {
1912                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1913                 talloc_free(tmp_ctx);
1914                 return;
1915         }
1916         ret = ctdb_dump_memory(ctdb, dump);
1917         if (ret != 0) {
1918                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1919                 talloc_free(tmp_ctx);
1920                 return;
1921         }
1922
1923 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1924
1925         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1926         if (ret != 0) {
1927                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1928                 talloc_free(tmp_ctx);
1929                 return;
1930         }
1931
1932         talloc_free(tmp_ctx);
1933 }
1934
1935 /*
1936   handler for reload_nodes
1937 */
1938 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1939                              TDB_DATA data, void *private_data)
1940 {
1941         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1942
1943         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1944
1945         reload_nodes_file(rec->ctdb);
1946 }
1947
1948
1949 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1950                               struct timeval yt, void *p)
1951 {
1952         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1953
1954         talloc_free(rec->ip_check_disable_ctx);
1955         rec->ip_check_disable_ctx = NULL;
1956 }
1957
1958
1959 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1960                              TDB_DATA data, void *private_data)
1961 {
1962         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1963         struct ctdb_public_ip *ip;
1964
1965         if (rec->recmaster != rec->ctdb->pnn) {
1966                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1967                 return;
1968         }
1969
1970         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1971                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1972                 return;
1973         }
1974
1975         ip = (struct ctdb_public_ip *)data.dptr;
1976
1977         update_ip_assignment_tree(rec->ctdb, ip);
1978 }
1979
1980
1981 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1982                              TDB_DATA data, void *private_data)
1983 {
1984         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1985         uint32_t timeout;
1986
1987         if (rec->ip_check_disable_ctx != NULL) {
1988                 talloc_free(rec->ip_check_disable_ctx);
1989                 rec->ip_check_disable_ctx = NULL;
1990         }
1991
1992         if (data.dsize != sizeof(uint32_t)) {
1993                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1994                                  "expexting %lu\n", (long unsigned)data.dsize,
1995                                  (long unsigned)sizeof(uint32_t)));
1996                 return;
1997         }
1998         if (data.dptr == NULL) {
1999                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2000                 return;
2001         }
2002
2003         timeout = *((uint32_t *)data.dptr);
2004         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2005
2006         rec->ip_check_disable_ctx = talloc_new(rec);
2007         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2008
2009         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2010 }
2011
2012
2013 /*
2014   handler for ip reallocate, just add it to the list of callers and 
2015   handle this later in the monitor_cluster loop so we do not recurse
2016   with other callers to takeover_run()
2017 */
2018 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2019                              TDB_DATA data, void *private_data)
2020 {
2021         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2022         struct ip_reallocate_list *caller;
2023
2024         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2025                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2026                 return;
2027         }
2028
2029         if (rec->ip_reallocate_ctx == NULL) {
2030                 rec->ip_reallocate_ctx = talloc_new(rec);
2031                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2032         }
2033
2034         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2035         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2036
2037         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2038         caller->next = rec->reallocate_callers;
2039         rec->reallocate_callers = caller;
2040
2041         return;
2042 }
2043
2044 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2045 {
2046         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2047         TDB_DATA result;
2048         int32_t ret;
2049         struct ip_reallocate_list *callers;
2050         uint32_t culprit;
2051
2052         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2053
2054         /* update the list of public ips that a node can handle for
2055            all connected nodes
2056         */
2057         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2058         if (ret != 0) {
2059                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2060                                  culprit));
2061                 rec->need_takeover_run = true;
2062         }
2063         if (ret == 0) {
2064                 ret = ctdb_takeover_run(ctdb, rec->nodemap, NULL, NULL);
2065                 if (ret != 0) {
2066                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2067                         rec->need_takeover_run = true;
2068                 }
2069         }
2070
2071         result.dsize = sizeof(int32_t);
2072         result.dptr  = (uint8_t *)&ret;
2073
2074         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2075
2076                 /* Someone that sent srvid==0 does not want a reply */
2077                 if (callers->rd->srvid == 0) {
2078                         continue;
2079                 }
2080                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2081                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2082                                   (unsigned long long)callers->rd->srvid));
2083                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2084                 if (ret != 0) {
2085                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2086                                          "message to %u:%llu\n",
2087                                          (unsigned)callers->rd->pnn,
2088                                          (unsigned long long)callers->rd->srvid));
2089                 }
2090         }
2091
2092         talloc_free(tmp_ctx);
2093         talloc_free(rec->ip_reallocate_ctx);
2094         rec->ip_reallocate_ctx = NULL;
2095         rec->reallocate_callers = NULL;
2096         
2097 }
2098
2099
2100 /*
2101   handler for recovery master elections
2102 */
2103 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2104                              TDB_DATA data, void *private_data)
2105 {
2106         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2107         int ret;
2108         struct election_message *em = (struct election_message *)data.dptr;
2109         TALLOC_CTX *mem_ctx;
2110
2111         /* we got an election packet - update the timeout for the election */
2112         talloc_free(rec->election_timeout);
2113         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2114                                                 fast_start ?
2115                                                 timeval_current_ofs(0, 500000) :
2116                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2117                                                 ctdb_election_timeout, rec);
2118
2119         mem_ctx = talloc_new(ctdb);
2120
2121         /* someone called an election. check their election data
2122            and if we disagree and we would rather be the elected node, 
2123            send a new election message to all other nodes
2124          */
2125         if (ctdb_election_win(rec, em)) {
2126                 if (!rec->send_election_te) {
2127                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2128                                                                 timeval_current_ofs(0, 500000),
2129                                                                 election_send_request, rec);
2130                 }
2131                 talloc_free(mem_ctx);
2132                 /*unban_all_nodes(ctdb);*/
2133                 return;
2134         }
2135         
2136         /* we didn't win */
2137         talloc_free(rec->send_election_te);
2138         rec->send_election_te = NULL;
2139
2140         if (ctdb->tunable.verify_recovery_lock != 0) {
2141                 /* release the recmaster lock */
2142                 if (em->pnn != ctdb->pnn &&
2143                     ctdb->recovery_lock_fd != -1) {
2144                         close(ctdb->recovery_lock_fd);
2145                         ctdb->recovery_lock_fd = -1;
2146                         unban_all_nodes(ctdb);
2147                 }
2148         }
2149
2150         /* ok, let that guy become recmaster then */
2151         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2152         if (ret != 0) {
2153                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2154                 talloc_free(mem_ctx);
2155                 return;
2156         }
2157
2158         talloc_free(mem_ctx);
2159         return;
2160 }
2161
2162
2163 /*
2164   force the start of the election process
2165  */
2166 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2167                            struct ctdb_node_map *nodemap)
2168 {
2169         int ret;
2170         struct ctdb_context *ctdb = rec->ctdb;
2171
2172         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2173
2174         /* set all nodes to recovery mode to stop all internode traffic */
2175         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2176         if (ret != 0) {
2177                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2178                 return;
2179         }
2180
2181         talloc_free(rec->election_timeout);
2182         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2183                                                 fast_start ?
2184                                                 timeval_current_ofs(0, 500000) :
2185                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2186                                                 ctdb_election_timeout, rec);
2187
2188         ret = send_election_request(rec, pnn, true);
2189         if (ret!=0) {
2190                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2191                 return;
2192         }
2193
2194         /* wait for a few seconds to collect all responses */
2195         ctdb_wait_election(rec);
2196 }
2197
2198
2199
2200 /*
2201   handler for when a node changes its flags
2202 */
2203 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2204                             TDB_DATA data, void *private_data)
2205 {
2206         int ret;
2207         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2208         struct ctdb_node_map *nodemap=NULL;
2209         TALLOC_CTX *tmp_ctx;
2210         uint32_t changed_flags;
2211         int i;
2212         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2213         int disabled_flag_changed;
2214
2215         if (data.dsize != sizeof(*c)) {
2216                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2217                 return;
2218         }
2219
2220         tmp_ctx = talloc_new(ctdb);
2221         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2222
2223         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2224         if (ret != 0) {
2225                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2226                 talloc_free(tmp_ctx);
2227                 return;         
2228         }
2229
2230
2231         for (i=0;i<nodemap->num;i++) {
2232                 if (nodemap->nodes[i].pnn == c->pnn) break;
2233         }
2234
2235         if (i == nodemap->num) {
2236                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2237                 talloc_free(tmp_ctx);
2238                 return;
2239         }
2240
2241         changed_flags = c->old_flags ^ c->new_flags;
2242
2243         if (nodemap->nodes[i].flags != c->new_flags) {
2244                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2245         }
2246
2247         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2248
2249         nodemap->nodes[i].flags = c->new_flags;
2250
2251         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2252                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2253
2254         if (ret == 0) {
2255                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2256                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2257         }
2258         
2259         if (ret == 0 &&
2260             ctdb->recovery_master == ctdb->pnn &&
2261             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2262                 /* Only do the takeover run if the perm disabled or unhealthy
2263                    flags changed since these will cause an ip failover but not
2264                    a recovery.
2265                    If the node became disconnected or banned this will also
2266                    lead to an ip address failover but that is handled 
2267                    during recovery
2268                 */
2269                 if (disabled_flag_changed) {
2270                         rec->need_takeover_run = true;
2271                 }
2272         }
2273
2274         talloc_free(tmp_ctx);
2275 }
2276
2277 /*
2278   handler for when we need to push out flag changes ot all other nodes
2279 */
2280 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2281                             TDB_DATA data, void *private_data)
2282 {
2283         int ret;
2284         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2285         struct ctdb_node_map *nodemap=NULL;
2286         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2287         uint32_t recmaster;
2288         uint32_t *nodes;
2289
2290         /* find the recovery master */
2291         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2292         if (ret != 0) {
2293                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2294                 talloc_free(tmp_ctx);
2295                 return;
2296         }
2297
2298         /* read the node flags from the recmaster */
2299         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2300         if (ret != 0) {
2301                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2302                 talloc_free(tmp_ctx);
2303                 return;
2304         }
2305         if (c->pnn >= nodemap->num) {
2306                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2307                 talloc_free(tmp_ctx);
2308                 return;
2309         }
2310
2311         /* send the flags update to all connected nodes */
2312         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2313
2314         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2315                                       nodes, 0, CONTROL_TIMEOUT(),
2316                                       false, data,
2317                                       NULL, NULL,
2318                                       NULL) != 0) {
2319                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2320
2321                 talloc_free(tmp_ctx);
2322                 return;
2323         }
2324
2325         talloc_free(tmp_ctx);
2326 }
2327
2328
2329 struct verify_recmode_normal_data {
2330         uint32_t count;
2331         enum monitor_result status;
2332 };
2333
2334 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2335 {
2336         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2337
2338
2339         /* one more node has responded with recmode data*/
2340         rmdata->count--;
2341
2342         /* if we failed to get the recmode, then return an error and let
2343            the main loop try again.
2344         */
2345         if (state->state != CTDB_CONTROL_DONE) {
2346                 if (rmdata->status == MONITOR_OK) {
2347                         rmdata->status = MONITOR_FAILED;
2348                 }
2349                 return;
2350         }
2351
2352         /* if we got a response, then the recmode will be stored in the
2353            status field
2354         */
2355         if (state->status != CTDB_RECOVERY_NORMAL) {
2356                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2357                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2358         }
2359
2360         return;
2361 }
2362
2363
2364 /* verify that all nodes are in normal recovery mode */
2365 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2366 {
2367         struct verify_recmode_normal_data *rmdata;
2368         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2369         struct ctdb_client_control_state *state;
2370         enum monitor_result status;
2371         int j;
2372         
2373         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2374         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2375         rmdata->count  = 0;
2376         rmdata->status = MONITOR_OK;
2377
2378         /* loop over all active nodes and send an async getrecmode call to 
2379            them*/
2380         for (j=0; j<nodemap->num; j++) {
2381                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2382                         continue;
2383                 }
2384                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2385                                         CONTROL_TIMEOUT(), 
2386                                         nodemap->nodes[j].pnn);
2387                 if (state == NULL) {
2388                         /* we failed to send the control, treat this as 
2389                            an error and try again next iteration
2390                         */                      
2391                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2392                         talloc_free(mem_ctx);
2393                         return MONITOR_FAILED;
2394                 }
2395
2396                 /* set up the callback functions */
2397                 state->async.fn = verify_recmode_normal_callback;
2398                 state->async.private_data = rmdata;
2399
2400                 /* one more control to wait for to complete */
2401                 rmdata->count++;
2402         }
2403
2404
2405         /* now wait for up to the maximum number of seconds allowed
2406            or until all nodes we expect a response from has replied
2407         */
2408         while (rmdata->count > 0) {
2409                 event_loop_once(ctdb->ev);
2410         }
2411
2412         status = rmdata->status;
2413         talloc_free(mem_ctx);
2414         return status;
2415 }
2416
2417
2418 struct verify_recmaster_data {
2419         struct ctdb_recoverd *rec;
2420         uint32_t count;
2421         uint32_t pnn;
2422         enum monitor_result status;
2423 };
2424
2425 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2426 {
2427         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2428
2429
2430         /* one more node has responded with recmaster data*/
2431         rmdata->count--;
2432
2433         /* if we failed to get the recmaster, then return an error and let
2434            the main loop try again.
2435         */
2436         if (state->state != CTDB_CONTROL_DONE) {
2437                 if (rmdata->status == MONITOR_OK) {
2438                         rmdata->status = MONITOR_FAILED;
2439                 }
2440                 return;
2441         }
2442
2443         /* if we got a response, then the recmaster will be stored in the
2444            status field
2445         */
2446         if (state->status != rmdata->pnn) {
2447                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2448                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2449                 rmdata->status = MONITOR_ELECTION_NEEDED;
2450         }
2451
2452         return;
2453 }
2454
2455
2456 /* verify that all nodes agree that we are the recmaster */
2457 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2458 {
2459         struct ctdb_context *ctdb = rec->ctdb;
2460         struct verify_recmaster_data *rmdata;
2461         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2462         struct ctdb_client_control_state *state;
2463         enum monitor_result status;
2464         int j;
2465         
2466         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2467         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2468         rmdata->rec    = rec;
2469         rmdata->count  = 0;
2470         rmdata->pnn    = pnn;
2471         rmdata->status = MONITOR_OK;
2472
2473         /* loop over all active nodes and send an async getrecmaster call to 
2474            them*/
2475         for (j=0; j<nodemap->num; j++) {
2476                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2477                         continue;
2478                 }
2479                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2480                                         CONTROL_TIMEOUT(),
2481                                         nodemap->nodes[j].pnn);
2482                 if (state == NULL) {
2483                         /* we failed to send the control, treat this as 
2484                            an error and try again next iteration
2485                         */                      
2486                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2487                         talloc_free(mem_ctx);
2488                         return MONITOR_FAILED;
2489                 }
2490
2491                 /* set up the callback functions */
2492                 state->async.fn = verify_recmaster_callback;
2493                 state->async.private_data = rmdata;
2494
2495                 /* one more control to wait for to complete */
2496                 rmdata->count++;
2497         }
2498
2499
2500         /* now wait for up to the maximum number of seconds allowed
2501            or until all nodes we expect a response from has replied
2502         */
2503         while (rmdata->count > 0) {
2504                 event_loop_once(ctdb->ev);
2505         }
2506
2507         status = rmdata->status;
2508         talloc_free(mem_ctx);
2509         return status;
2510 }
2511
2512
2513 /* called to check that the local allocation of public ip addresses is ok.
2514 */
2515 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2516 {
2517         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2518         struct ctdb_control_get_ifaces *ifaces = NULL;
2519         struct ctdb_all_public_ips *ips = NULL;
2520         struct ctdb_uptime *uptime1 = NULL;
2521         struct ctdb_uptime *uptime2 = NULL;
2522         int ret, j;
2523         bool need_iface_check = false;
2524         bool need_takeover_run = false;
2525
2526         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2527                                 CTDB_CURRENT_NODE, &uptime1);
2528         if (ret != 0) {
2529                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2530                 talloc_free(mem_ctx);
2531                 return -1;
2532         }
2533
2534
2535         /* read the interfaces from the local node */
2536         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2537         if (ret != 0) {
2538                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2539                 talloc_free(mem_ctx);
2540                 return -1;
2541         }
2542
2543         if (!rec->ifaces) {
2544                 need_iface_check = true;
2545         } else if (rec->ifaces->num != ifaces->num) {
2546                 need_iface_check = true;
2547         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2548                 need_iface_check = true;
2549         }
2550
2551         if (need_iface_check) {
2552                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2553                                      "local node %u - force takeover run\n",
2554                                      pnn));
2555                 need_takeover_run = true;
2556         }
2557
2558         /* read the ip allocation from the local node */
2559         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2560         if (ret != 0) {
2561                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2562                 talloc_free(mem_ctx);
2563                 return -1;
2564         }
2565
2566         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2567                                 CTDB_CURRENT_NODE, &uptime2);
2568         if (ret != 0) {
2569                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2570                 talloc_free(mem_ctx);
2571                 return -1;
2572         }
2573
2574         /* skip the check if the startrecovery time has changed */
2575         if (timeval_compare(&uptime1->last_recovery_started,
2576                             &uptime2->last_recovery_started) != 0) {
2577                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2578                 talloc_free(mem_ctx);
2579                 return 0;
2580         }
2581
2582         /* skip the check if the endrecovery time has changed */
2583         if (timeval_compare(&uptime1->last_recovery_finished,
2584                             &uptime2->last_recovery_finished) != 0) {
2585                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2586                 talloc_free(mem_ctx);
2587                 return 0;
2588         }
2589
2590         /* skip the check if we have started but not finished recovery */
2591         if (timeval_compare(&uptime1->last_recovery_finished,
2592                             &uptime1->last_recovery_started) != 1) {
2593                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2594                 talloc_free(mem_ctx);
2595
2596                 return 0;
2597         }
2598
2599         talloc_free(rec->ifaces);
2600         rec->ifaces = talloc_steal(rec, ifaces);
2601
2602         /* verify that we have the ip addresses we should have
2603            and we dont have ones we shouldnt have.
2604            if we find an inconsistency we set recmode to
2605            active on the local node and wait for the recmaster
2606            to do a full blown recovery.
2607            also if the pnn is -1 and we are healthy and can host the ip
2608            we also request a ip reallocation.
2609         */
2610         if (ctdb->tunable.disable_ip_failover == 0) {
2611                 for (j=0; j<ips->num; j++) {
2612                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2613                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2614                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2615                                 need_takeover_run = true;
2616                         } else if (ips->ips[j].pnn == pnn) {
2617                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2618                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2619                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2620                                         need_takeover_run = true;
2621                                 }
2622                         } else {
2623                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2624                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2625                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2626                                         need_takeover_run = true;
2627                                 }
2628                         }
2629                 }
2630         }
2631
2632         if (need_takeover_run) {
2633                 struct takeover_run_reply rd;
2634                 TDB_DATA data;
2635
2636                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2637
2638                 rd.pnn = ctdb->pnn;
2639                 rd.srvid = 0;
2640                 data.dptr = (uint8_t *)&rd;
2641                 data.dsize = sizeof(rd);
2642
2643                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2644                 if (ret != 0) {
2645                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2646                 }
2647         }
2648         talloc_free(mem_ctx);
2649         return 0;
2650 }
2651
2652
2653 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2654 {
2655         struct ctdb_node_map **remote_nodemaps = callback_data;
2656
2657         if (node_pnn >= ctdb->num_nodes) {
2658                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2659                 return;
2660         }
2661
2662         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2663
2664 }
2665
2666 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2667         struct ctdb_node_map *nodemap,
2668         struct ctdb_node_map **remote_nodemaps)
2669 {
2670         uint32_t *nodes;
2671
2672         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2673         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2674                                         nodes, 0,
2675                                         CONTROL_TIMEOUT(), false, tdb_null,
2676                                         async_getnodemap_callback,
2677                                         NULL,
2678                                         remote_nodemaps) != 0) {
2679                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2680
2681                 return -1;
2682         }
2683
2684         return 0;
2685 }
2686
2687 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2688 struct ctdb_check_reclock_state {
2689         struct ctdb_context *ctdb;
2690         struct timeval start_time;
2691         int fd[2];
2692         pid_t child;
2693         struct timed_event *te;
2694         struct fd_event *fde;
2695         enum reclock_child_status status;
2696 };
2697
2698 /* when we free the reclock state we must kill any child process.
2699 */
2700 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2701 {
2702         struct ctdb_context *ctdb = state->ctdb;
2703
2704         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2705
2706         if (state->fd[0] != -1) {
2707                 close(state->fd[0]);
2708                 state->fd[0] = -1;
2709         }
2710         if (state->fd[1] != -1) {
2711                 close(state->fd[1]);
2712                 state->fd[1] = -1;
2713         }
2714         kill(state->child, SIGKILL);
2715         return 0;
2716 }
2717
2718 /*
2719   called if our check_reclock child times out. this would happen if
2720   i/o to the reclock file blocks.
2721  */
2722 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2723                                          struct timeval t, void *private_data)
2724 {
2725         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2726                                            struct ctdb_check_reclock_state);
2727
2728         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2729         state->status = RECLOCK_TIMEOUT;
2730 }
2731
2732 /* this is called when the child process has completed checking the reclock
2733    file and has written data back to us through the pipe.
2734 */
2735 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2736                              uint16_t flags, void *private_data)
2737 {
2738         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2739                                              struct ctdb_check_reclock_state);
2740         char c = 0;
2741         int ret;
2742
2743         /* we got a response from our child process so we can abort the
2744            timeout.
2745         */
2746         talloc_free(state->te);
2747         state->te = NULL;
2748
2749         ret = read(state->fd[0], &c, 1);
2750         if (ret != 1 || c != RECLOCK_OK) {
2751                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2752                 state->status = RECLOCK_FAILED;
2753
2754                 return;
2755         }
2756
2757         state->status = RECLOCK_OK;
2758         return;
2759 }
2760
2761 static int check_recovery_lock(struct ctdb_context *ctdb)
2762 {
2763         int ret;
2764         struct ctdb_check_reclock_state *state;
2765         pid_t parent = getpid();
2766
2767         if (ctdb->recovery_lock_fd == -1) {
2768                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2769                 return -1;
2770         }
2771
2772         state = talloc(ctdb, struct ctdb_check_reclock_state);
2773         CTDB_NO_MEMORY(ctdb, state);
2774
2775         state->ctdb = ctdb;
2776         state->start_time = timeval_current();
2777         state->status = RECLOCK_CHECKING;
2778         state->fd[0] = -1;
2779         state->fd[1] = -1;
2780
2781         ret = pipe(state->fd);
2782         if (ret != 0) {
2783                 talloc_free(state);
2784                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2785                 return -1;
2786         }
2787
2788         state->child = ctdb_fork(ctdb);
2789         if (state->child == (pid_t)-1) {
2790                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2791                 close(state->fd[0]);
2792                 state->fd[0] = -1;
2793                 close(state->fd[1]);
2794                 state->fd[1] = -1;
2795                 talloc_free(state);
2796                 return -1;
2797         }
2798
2799         if (state->child == 0) {
2800                 char cc = RECLOCK_OK;
2801                 close(state->fd[0]);
2802                 state->fd[0] = -1;
2803
2804                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2805                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2806                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2807                         cc = RECLOCK_FAILED;
2808                 }
2809
2810                 write(state->fd[1], &cc, 1);
2811                 /* make sure we die when our parent dies */
2812                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2813                         sleep(5);
2814                         write(state->fd[1], &cc, 1);
2815                 }
2816                 _exit(0);
2817         }
2818         close(state->fd[1]);
2819         state->fd[1] = -1;
2820         set_close_on_exec(state->fd[0]);
2821
2822         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2823
2824         talloc_set_destructor(state, check_reclock_destructor);
2825
2826         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2827                                     ctdb_check_reclock_timeout, state);
2828         if (state->te == NULL) {
2829                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2830                 talloc_free(state);
2831                 return -1;
2832         }
2833
2834         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2835                                 EVENT_FD_READ,
2836                                 reclock_child_handler,
2837                                 (void *)state);
2838
2839         if (state->fde == NULL) {
2840                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2841                 talloc_free(state);
2842                 return -1;
2843         }
2844         tevent_fd_set_auto_close(state->fde);
2845
2846         while (state->status == RECLOCK_CHECKING) {
2847                 event_loop_once(ctdb->ev);
2848         }
2849
2850         if (state->status == RECLOCK_FAILED) {
2851                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2852                 close(ctdb->recovery_lock_fd);
2853                 ctdb->recovery_lock_fd = -1;
2854                 talloc_free(state);
2855                 return -1;
2856         }
2857
2858         talloc_free(state);
2859         return 0;
2860 }
2861
2862 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2863 {
2864         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2865         const char *reclockfile;
2866
2867         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2868                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2869                 talloc_free(tmp_ctx);
2870                 return -1;      
2871         }
2872
2873         if (reclockfile == NULL) {
2874                 if (ctdb->recovery_lock_file != NULL) {
2875                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2876                         talloc_free(ctdb->recovery_lock_file);
2877                         ctdb->recovery_lock_file = NULL;
2878                         if (ctdb->recovery_lock_fd != -1) {
2879                                 close(ctdb->recovery_lock_fd);
2880                                 ctdb->recovery_lock_fd = -1;
2881                         }
2882                 }
2883                 ctdb->tunable.verify_recovery_lock = 0;
2884                 talloc_free(tmp_ctx);
2885                 return 0;
2886         }
2887
2888         if (ctdb->recovery_lock_file == NULL) {
2889                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2890                 if (ctdb->recovery_lock_fd != -1) {
2891                         close(ctdb->recovery_lock_fd);
2892                         ctdb->recovery_lock_fd = -1;
2893                 }
2894                 talloc_free(tmp_ctx);
2895                 return 0;
2896         }
2897
2898
2899         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2900                 talloc_free(tmp_ctx);
2901                 return 0;
2902         }
2903
2904         talloc_free(ctdb->recovery_lock_file);
2905         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2906         ctdb->tunable.verify_recovery_lock = 0;
2907         if (ctdb->recovery_lock_fd != -1) {
2908                 close(ctdb->recovery_lock_fd);
2909                 ctdb->recovery_lock_fd = -1;
2910         }
2911
2912         talloc_free(tmp_ctx);
2913         return 0;
2914 }
2915
2916 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
2917                       TALLOC_CTX *mem_ctx)
2918 {
2919         uint32_t pnn;
2920         struct ctdb_node_map *nodemap=NULL;
2921         struct ctdb_node_map *recmaster_nodemap=NULL;
2922         struct ctdb_node_map **remote_nodemaps=NULL;
2923         struct ctdb_vnn_map *vnnmap=NULL;
2924         struct ctdb_vnn_map *remote_vnnmap=NULL;
2925         int32_t debug_level;
2926         int i, j, ret;
2927
2928
2929
2930         /* verify that the main daemon is still running */
2931         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2932                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2933                 exit(-1);
2934         }
2935
2936         /* ping the local daemon to tell it we are alive */
2937         ctdb_ctrl_recd_ping(ctdb);
2938
2939         if (rec->election_timeout) {
2940                 /* an election is in progress */
2941                 return;
2942         }
2943
2944         /* read the debug level from the parent and update locally */
2945         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2946         if (ret !=0) {
2947                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2948                 return;
2949         }
2950         LogLevel = debug_level;
2951
2952
2953         /* We must check if we need to ban a node here but we want to do this
2954            as early as possible so we dont wait until we have pulled the node
2955            map from the local node. thats why we have the hardcoded value 20
2956         */
2957         for (i=0; i<ctdb->num_nodes; i++) {
2958                 struct ctdb_banning_state *ban_state;
2959
2960                 if (ctdb->nodes[i]->ban_state == NULL) {
2961                         continue;
2962                 }
2963                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2964                 if (ban_state->count < 20) {
2965                         continue;
2966                 }
2967                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2968                         ctdb->nodes[i]->pnn, ban_state->count,
2969                         ctdb->tunable.recovery_ban_period));
2970                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2971                 ban_state->count = 0;
2972         }
2973
2974         /* get relevant tunables */
2975         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2976         if (ret != 0) {
2977                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2978                 return;
2979         }
2980
2981         /* get the current recovery lock file from the server */
2982         if (update_recovery_lock_file(ctdb) != 0) {
2983                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2984                 return;
2985         }
2986
2987         /* Make sure that if recovery lock verification becomes disabled when
2988            we close the file
2989         */
2990         if (ctdb->tunable.verify_recovery_lock == 0) {
2991                 if (ctdb->recovery_lock_fd != -1) {
2992                         close(ctdb->recovery_lock_fd);
2993                         ctdb->recovery_lock_fd = -1;
2994                 }
2995         }
2996
2997         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2998         if (pnn == (uint32_t)-1) {
2999                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3000                 return;
3001         }
3002
3003         /* get the vnnmap */
3004         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3005         if (ret != 0) {
3006                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3007                 return;
3008         }
3009
3010
3011         /* get number of nodes */
3012         if (rec->nodemap) {
3013                 talloc_free(rec->nodemap);
3014                 rec->nodemap = NULL;
3015                 nodemap=NULL;
3016         }
3017         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3018         if (ret != 0) {
3019                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3020                 return;
3021         }
3022         nodemap = rec->nodemap;
3023
3024         /* check which node is the recovery master */
3025         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3026         if (ret != 0) {
3027                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3028                 return;
3029         }
3030
3031         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3032         if (rec->recmaster != pnn) {
3033                 if (rec->ip_reallocate_ctx != NULL) {
3034                         talloc_free(rec->ip_reallocate_ctx);
3035                         rec->ip_reallocate_ctx = NULL;
3036                         rec->reallocate_callers = NULL;
3037                 }
3038         }
3039         /* if there are takeovers requested, perform it and notify the waiters */
3040         if (rec->reallocate_callers) {
3041                 process_ipreallocate_requests(ctdb, rec);
3042         }
3043
3044         if (rec->recmaster == (uint32_t)-1) {
3045                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3046                 force_election(rec, pnn, nodemap);
3047                 return;
3048         }
3049
3050
3051         /* if the local daemon is STOPPED, we verify that the databases are
3052            also frozen and thet the recmode is set to active 
3053         */
3054         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3055                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3056                 if (ret != 0) {
3057                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3058                 }
3059                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3060                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3061
3062                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3063                         if (ret != 0) {
3064                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3065                                 return;
3066                         }
3067                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3068                         if (ret != 0) {
3069                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3070
3071                                 return;
3072                         }
3073                         return;
3074                 }
3075         }
3076         /* If the local node is stopped, verify we are not the recmaster 
3077            and yield this role if so
3078         */
3079         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3080                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3081                 force_election(rec, pnn, nodemap);
3082                 return;
3083         }
3084         
3085         /* check that we (recovery daemon) and the local ctdb daemon
3086            agrees on whether we are banned or not
3087         */
3088 //qqq
3089
3090         /* remember our own node flags */
3091         rec->node_flags = nodemap->nodes[pnn].flags;
3092
3093         /* count how many active nodes there are */
3094         rec->num_active    = 0;
3095         rec->num_connected = 0;
3096         for (i=0; i<nodemap->num; i++) {
3097                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3098                         rec->num_active++;
3099                 }
3100                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3101                         rec->num_connected++;
3102                 }
3103         }
3104
3105
3106         /* verify that the recmaster node is still active */
3107         for (j=0; j<nodemap->num; j++) {
3108                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3109                         break;
3110                 }
3111         }
3112
3113         if (j == nodemap->num) {
3114                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3115                 force_election(rec, pnn, nodemap);
3116                 return;
3117         }
3118
3119         /* if recovery master is disconnected we must elect a new recmaster */
3120         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3121                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3122                 force_election(rec, pnn, nodemap);
3123                 return;
3124         }
3125
3126         /* grap the nodemap from the recovery master to check if it is banned */
3127         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3128                                    mem_ctx, &recmaster_nodemap);
3129         if (ret != 0) {
3130                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3131                           nodemap->nodes[j].pnn));
3132                 return;
3133         }
3134
3135
3136         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3137                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3138                 force_election(rec, pnn, nodemap);
3139                 return;
3140         }
3141
3142
3143         /* verify that we have all ip addresses we should have and we dont
3144          * have addresses we shouldnt have.
3145          */ 
3146         if (ctdb->tunable.disable_ip_failover == 0) {
3147                 if (rec->ip_check_disable_ctx == NULL) {
3148                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3149                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3150                         }
3151                 }
3152         }
3153
3154
3155         /* if we are not the recmaster then we do not need to check
3156            if recovery is needed
3157          */
3158         if (pnn != rec->recmaster) {
3159                 return;
3160         }
3161
3162
3163         /* ensure our local copies of flags are right */
3164         ret = update_local_flags(rec, nodemap);
3165         if (ret == MONITOR_ELECTION_NEEDED) {
3166                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3167                 force_election(rec, pnn, nodemap);
3168                 return;
3169         }
3170         if (ret != MONITOR_OK) {
3171                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3172                 return;
3173         }
3174
3175         if (ctdb->num_nodes != nodemap->num) {
3176                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3177                 reload_nodes_file(ctdb);
3178                 return;
3179         }
3180
3181         /* verify that all active nodes agree that we are the recmaster */
3182         switch (verify_recmaster(rec, nodemap, pnn)) {
3183         case MONITOR_RECOVERY_NEEDED:
3184                 /* can not happen */
3185                 return;
3186         case MONITOR_ELECTION_NEEDED:
3187                 force_election(rec, pnn, nodemap);
3188                 return;
3189         case MONITOR_OK:
3190                 break;
3191         case MONITOR_FAILED:
3192                 return;
3193         }
3194
3195
3196         if (rec->need_recovery) {
3197                 /* a previous recovery didn't finish */
3198                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3199                 return;
3200         }
3201
3202         /* verify that all active nodes are in normal mode 
3203            and not in recovery mode 
3204         */
3205         switch (verify_recmode(ctdb, nodemap)) {
3206         case MONITOR_RECOVERY_NEEDED:
3207                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3208                 return;
3209         case MONITOR_FAILED:
3210                 return;
3211         case MONITOR_ELECTION_NEEDED:
3212                 /* can not happen */
3213         case MONITOR_OK:
3214                 break;
3215         }
3216
3217
3218         if (ctdb->tunable.verify_recovery_lock != 0) {
3219                 /* we should have the reclock - check its not stale */
3220                 ret = check_recovery_lock(ctdb);
3221                 if (ret != 0) {
3222                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3223                         ctdb_set_culprit(rec, ctdb->pnn);
3224                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3225                         return;
3226                 }
3227         }
3228
3229         /* get the nodemap for all active remote nodes
3230          */
3231         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3232         if (remote_nodemaps == NULL) {
3233                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3234                 return;
3235         }
3236         for(i=0; i<nodemap->num; i++) {
3237                 remote_nodemaps[i] = NULL;
3238         }
3239         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3240                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3241                 return;
3242         } 
3243
3244         /* verify that all other nodes have the same nodemap as we have
3245         */
3246         for (j=0; j<nodemap->num; j++) {
3247                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3248                         continue;
3249                 }
3250
3251                 if (remote_nodemaps[j] == NULL) {
3252                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3253                         ctdb_set_culprit(rec, j);
3254
3255                         return;
3256                 }
3257
3258                 /* if the nodes disagree on how many nodes there are
3259                    then this is a good reason to try recovery
3260                  */
3261                 if (remote_nodemaps[j]->num != nodemap->num) {
3262                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3263                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3264                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3265                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3266                         return;
3267                 }
3268
3269                 /* if the nodes disagree on which nodes exist and are
3270                    active, then that is also a good reason to do recovery
3271                  */
3272                 for (i=0;i<nodemap->num;i++) {
3273                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3274                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3275                                           nodemap->nodes[j].pnn, i, 
3276                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3277                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3278                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3279                                             vnnmap);
3280                                 return;
3281                         }
3282                 }
3283
3284                 /* verify the flags are consistent
3285                 */
3286                 for (i=0; i<nodemap->num; i++) {
3287                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3288                                 continue;
3289                         }
3290                         
3291                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3292                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3293                                   nodemap->nodes[j].pnn, 
3294                                   nodemap->nodes[i].pnn, 
3295                                   remote_nodemaps[j]->nodes[i].flags,
3296                                   nodemap->nodes[j].flags));
3297                                 if (i == j) {
3298                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3299                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3300                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3301                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3302                                                     vnnmap);
3303                                         return;
3304                                 } else {
3305                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3306                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3307                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3308                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3309                                                     vnnmap);
3310                                         return;
3311                                 }
3312                         }
3313                 }
3314         }
3315
3316
3317         /* there better be the same number of lmasters in the vnn map
3318            as there are active nodes or we will have to do a recovery
3319          */
3320         if (vnnmap->size != rec->num_active) {
3321                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3322                           vnnmap->size, rec->num_active));
3323                 ctdb_set_culprit(rec, ctdb->pnn);
3324                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3325                 return;
3326         }
3327
3328         /* verify that all active nodes in the nodemap also exist in 
3329            the vnnmap.
3330          */
3331         for (j=0; j<nodemap->num; j++) {
3332                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3333                         continue;
3334                 }
3335                 if (nodemap->nodes[j].pnn == pnn) {
3336                         continue;
3337                 }
3338
3339                 for (i=0; i<vnnmap->size; i++) {
3340                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3341                                 break;
3342                         }
3343                 }
3344                 if (i == vnnmap->size) {
3345                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3346                                   nodemap->nodes[j].pnn));
3347                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3348                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3349                         return;
3350                 }
3351         }
3352
3353         
3354         /* verify that all other nodes have the same vnnmap
3355            and are from the same generation
3356          */
3357         for (j=0; j<nodemap->num; j++) {
3358                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3359                         continue;
3360                 }
3361                 if (nodemap->nodes[j].pnn == pnn) {
3362                         continue;
3363                 }
3364
3365                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3366                                           mem_ctx, &remote_vnnmap);
3367                 if (ret != 0) {
3368                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3369                                   nodemap->nodes[j].pnn));
3370                         return;
3371                 }
3372
3373                 /* verify the vnnmap generation is the same */
3374                 if (vnnmap->generation != remote_vnnmap->generation) {
3375                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3376                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3377                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3378                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3379                         return;
3380                 }
3381
3382                 /* verify the vnnmap size is the same */
3383                 if (vnnmap->size != remote_vnnmap->size) {
3384                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3385                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3386                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3387                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3388                         return;
3389                 }
3390
3391                 /* verify the vnnmap is the same */
3392                 for (i=0;i<vnnmap->size;i++) {
3393                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3394                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3395                                           nodemap->nodes[j].pnn));
3396                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3397                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3398                                             vnnmap);
3399                                 return;
3400                         }
3401                 }
3402         }
3403
3404         /* we might need to change who has what IP assigned */
3405         if (rec->need_takeover_run) {
3406                 uint32_t culprit = (uint32_t)-1;
3407
3408                 rec->need_takeover_run = false;
3409
3410                 /* update the list of public ips that a node can handle for
3411                    all connected nodes
3412                 */
3413                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3414                 if (ret != 0) {
3415                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3416                                          culprit));
3417                         rec->need_takeover_run = true;
3418                         return;
3419                 }
3420
3421                 /* execute the "startrecovery" event script on all nodes */
3422                 ret = run_startrecovery_eventscript(rec, nodemap);
3423                 if (ret!=0) {
3424                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3425                         ctdb_set_culprit(rec, ctdb->pnn);
3426                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3427                         return;
3428                 }
3429
3430                 /* If takeover run fails, then the offending nodes are
3431                  * assigned ban culprit counts. And we re-try takeover.
3432                  * If takeover run fails repeatedly, the node would get
3433                  * banned.
3434                  *
3435                  * If rec->need_takeover_run is not set to true at this
3436                  * failure, monitoring is disabled cluster-wide (via
3437                  * startrecovery eventscript) and will not get enabled.
3438                  */
3439                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3440                 if (ret != 0) {
3441                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3442                         return;
3443                 }
3444
3445                 /* execute the "recovered" event script on all nodes */
3446                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3447 #if 0
3448 // we cant check whether the event completed successfully
3449 // since this script WILL fail if the node is in recovery mode
3450 // and if that race happens, the code here would just cause a second
3451 // cascading recovery.
3452                 if (ret!=0) {
3453                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3454                         ctdb_set_culprit(rec, ctdb->pnn);
3455                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3456                 }
3457 #endif
3458         }
3459 }
3460
3461 /*
3462   the main monitoring loop
3463  */
3464 static void monitor_cluster(struct ctdb_context *ctdb)
3465 {
3466         struct ctdb_recoverd *rec;
3467
3468         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3469
3470         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3471         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3472
3473         rec->ctdb = ctdb;
3474
3475         rec->priority_time = timeval_current();
3476
3477         /* register a message port for sending memory dumps */
3478         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3479
3480         /* register a message port for recovery elections */
3481         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3482
3483         /* when nodes are disabled/enabled */
3484         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3485
3486         /* when we are asked to puch out a flag change */
3487         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3488
3489         /* register a message port for vacuum fetch */
3490         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3491
3492         /* register a message port for reloadnodes  */
3493         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3494
3495         /* register a message port for performing a takeover run */
3496         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3497
3498         /* register a message port for disabling the ip check for a short while */
3499         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3500
3501         /* register a message port for updating the recovery daemons node assignment for an ip */
3502         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3503
3504         for (;;) {
3505                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3506                 struct timeval start;
3507                 double elapsed;
3508
3509                 if (!mem_ctx) {
3510                         DEBUG(DEBUG_CRIT,(__location__
3511                                           " Failed to create temp context\n"));
3512                         exit(-1);
3513                 }
3514
3515                 start = timeval_current();
3516                 main_loop(ctdb, rec, mem_ctx);
3517                 talloc_free(mem_ctx);
3518
3519                 /* we only check for recovery once every second */
3520                 elapsed = timeval_elapsed(&start);
3521                 if (elapsed < ctdb->tunable.recover_interval) {
3522                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3523                                           - elapsed);
3524                 }
3525         }
3526 }
3527
3528 /*
3529   event handler for when the main ctdbd dies
3530  */
3531 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3532                                  uint16_t flags, void *private_data)
3533 {
3534         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3535         _exit(1);
3536 }
3537
3538 /*
3539   called regularly to verify that the recovery daemon is still running
3540  */
3541 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3542                               struct timeval yt, void *p)
3543 {
3544         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3545
3546         if (kill(ctdb->recoverd_pid, 0) != 0) {
3547                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3548
3549                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3550                                 ctdb_restart_recd, ctdb);
3551
3552                 return;
3553         }
3554
3555         event_add_timed(ctdb->ev, ctdb, 
3556                         timeval_current_ofs(30, 0),
3557                         ctdb_check_recd, ctdb);
3558 }
3559
3560 static void recd_sig_child_handler(struct event_context *ev,
3561         struct signal_event *se, int signum, int count,
3562         void *dont_care, 
3563         void *private_data)
3564 {
3565 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3566         int status;
3567         pid_t pid = -1;
3568
3569         while (pid != 0) {
3570                 pid = waitpid(-1, &status, WNOHANG);
3571                 if (pid == -1) {
3572                         if (errno != ECHILD) {
3573                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3574                         }
3575                         return;
3576                 }
3577                 if (pid > 0) {
3578                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3579                 }
3580         }
3581 }
3582
3583 /*
3584   startup the recovery daemon as a child of the main ctdb daemon
3585  */
3586 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3587 {
3588         int fd[2];
3589         struct signal_event *se;
3590         struct tevent_fd *fde;
3591
3592         if (pipe(fd) != 0) {
3593                 return -1;
3594         }
3595
3596         ctdb->ctdbd_pid = getpid();
3597
3598         ctdb->recoverd_pid = fork();
3599         if (ctdb->recoverd_pid == -1) {
3600                 return -1;
3601         }
3602         
3603         if (ctdb->recoverd_pid != 0) {
3604                 close(fd[0]);
3605                 event_add_timed(ctdb->ev, ctdb, 
3606                                 timeval_current_ofs(30, 0),
3607                                 ctdb_check_recd, ctdb);
3608                 return 0;
3609         }
3610
3611         close(fd[1]);
3612
3613         srandom(getpid() ^ time(NULL));
3614
3615         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3616                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3617                 exit(1);
3618         }
3619
3620         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3621
3622         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3623                      ctdb_recoverd_parent, &fd[0]);     
3624         tevent_fd_set_auto_close(fde);
3625
3626         /* set up a handler to pick up sigchld */
3627         se = event_add_signal(ctdb->ev, ctdb,
3628                                      SIGCHLD, 0,
3629                                      recd_sig_child_handler,
3630                                      ctdb);
3631         if (se == NULL) {
3632                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3633                 exit(1);
3634         }
3635
3636         monitor_cluster(ctdb);
3637
3638         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3639         return -1;
3640 }
3641
3642 /*
3643   shutdown the recovery daemon
3644  */
3645 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3646 {
3647         if (ctdb->recoverd_pid == 0) {
3648                 return;
3649         }
3650
3651         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3652         kill(ctdb->recoverd_pid, SIGTERM);
3653 }
3654
3655 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3656                        struct timeval t, void *private_data)
3657 {
3658         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3659
3660         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3661         ctdb_stop_recoverd(ctdb);
3662         ctdb_start_recoverd(ctdb);
3663 }