Merge commit 'obnox/master-rebase'
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66 };
67
68 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
69 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
70
71
72 /*
73   ban a node for a period of time
74  */
75 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
76 {
77         int ret;
78         struct ctdb_context *ctdb = rec->ctdb;
79         struct ctdb_ban_time bantime;
80        
81         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
82
83         if (!ctdb_validate_pnn(ctdb, pnn)) {
84                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
85                 return;
86         }
87
88         bantime.pnn  = pnn;
89         bantime.time = ban_time;
90
91         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
92         if (ret != 0) {
93                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
94                 return;
95         }
96
97 }
98
99 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
100
101
102 /*
103   run the "recovered" eventscript on all nodes
104  */
105 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
106 {
107         TALLOC_CTX *tmp_ctx;
108         uint32_t *nodes;
109
110         tmp_ctx = talloc_new(ctdb);
111         CTDB_NO_MEMORY(ctdb, tmp_ctx);
112
113         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
114         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
115                                         nodes,
116                                         CONTROL_TIMEOUT(), false, tdb_null,
117                                         NULL, NULL,
118                                         NULL) != 0) {
119                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
120
121                 talloc_free(tmp_ctx);
122                 return -1;
123         }
124
125         talloc_free(tmp_ctx);
126         return 0;
127 }
128
129 /*
130   remember the trouble maker
131  */
132 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
133 {
134         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
135         struct ctdb_banning_state *ban_state;
136
137         if (culprit > ctdb->num_nodes) {
138                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
139                 return;
140         }
141
142         if (ctdb->nodes[culprit]->ban_state == NULL) {
143                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
144                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
145
146                 
147         }
148         ban_state = ctdb->nodes[culprit]->ban_state;
149         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
150                 /* this was the first time in a long while this node
151                    misbehaved so we will forgive any old transgressions.
152                 */
153                 ban_state->count = 0;
154         }
155
156         ban_state->count += count;
157         ban_state->last_reported_time = timeval_current();
158         rec->last_culprit_node = culprit;
159 }
160
161 /*
162   remember the trouble maker
163  */
164 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
165 {
166         ctdb_set_culprit_count(rec, culprit, 1);
167 }
168
169
170 /* this callback is called for every node that failed to execute the
171    start recovery event
172 */
173 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
174 {
175         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
176
177         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
178
179         ctdb_set_culprit(rec, node_pnn);
180 }
181
182 /*
183   run the "startrecovery" eventscript on all nodes
184  */
185 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
186 {
187         TALLOC_CTX *tmp_ctx;
188         uint32_t *nodes;
189         struct ctdb_context *ctdb = rec->ctdb;
190
191         tmp_ctx = talloc_new(ctdb);
192         CTDB_NO_MEMORY(ctdb, tmp_ctx);
193
194         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
195         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
196                                         nodes,
197                                         CONTROL_TIMEOUT(), false, tdb_null,
198                                         NULL,
199                                         startrecovery_fail_callback,
200                                         rec) != 0) {
201                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
202                 talloc_free(tmp_ctx);
203                 return -1;
204         }
205
206         talloc_free(tmp_ctx);
207         return 0;
208 }
209
210 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
211 {
212         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
213                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
214                 return;
215         }
216         if (node_pnn < ctdb->num_nodes) {
217                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
218         }
219 }
220
221 /*
222   update the node capabilities for all connected nodes
223  */
224 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
225 {
226         uint32_t *nodes;
227         TALLOC_CTX *tmp_ctx;
228
229         tmp_ctx = talloc_new(ctdb);
230         CTDB_NO_MEMORY(ctdb, tmp_ctx);
231
232         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
233         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
234                                         nodes, CONTROL_TIMEOUT(),
235                                         false, tdb_null,
236                                         async_getcap_callback, NULL,
237                                         NULL) != 0) {
238                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
239                 talloc_free(tmp_ctx);
240                 return -1;
241         }
242
243         talloc_free(tmp_ctx);
244         return 0;
245 }
246
247 /*
248   change recovery mode on all nodes
249  */
250 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
251 {
252         TDB_DATA data;
253         uint32_t *nodes;
254         TALLOC_CTX *tmp_ctx;
255
256         tmp_ctx = talloc_new(ctdb);
257         CTDB_NO_MEMORY(ctdb, tmp_ctx);
258
259         /* freeze all nodes */
260         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
261         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
262                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
263                                                 nodes, CONTROL_TIMEOUT(),
264                                                 false, tdb_null,
265                                                 NULL, NULL,
266                                                 NULL) != 0) {
267                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
268                         talloc_free(tmp_ctx);
269                         return -1;
270                 }
271         }
272
273
274         data.dsize = sizeof(uint32_t);
275         data.dptr = (unsigned char *)&rec_mode;
276
277         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
278                                         nodes, CONTROL_TIMEOUT(),
279                                         false, data,
280                                         NULL, NULL,
281                                         NULL) != 0) {
282                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
283                 talloc_free(tmp_ctx);
284                 return -1;
285         }
286
287         talloc_free(tmp_ctx);
288         return 0;
289 }
290
291 /*
292   change recovery master on all node
293  */
294 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
295 {
296         TDB_DATA data;
297         TALLOC_CTX *tmp_ctx;
298         uint32_t *nodes;
299
300         tmp_ctx = talloc_new(ctdb);
301         CTDB_NO_MEMORY(ctdb, tmp_ctx);
302
303         data.dsize = sizeof(uint32_t);
304         data.dptr = (unsigned char *)&pnn;
305
306         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
307         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
308                                         nodes,
309                                         CONTROL_TIMEOUT(), false, data,
310                                         NULL, NULL,
311                                         NULL) != 0) {
312                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
313                 talloc_free(tmp_ctx);
314                 return -1;
315         }
316
317         talloc_free(tmp_ctx);
318         return 0;
319 }
320
321
322 /*
323   ensure all other nodes have attached to any databases that we have
324  */
325 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
326                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
327 {
328         int i, j, db, ret;
329         struct ctdb_dbid_map *remote_dbmap;
330
331         /* verify that all other nodes have all our databases */
332         for (j=0; j<nodemap->num; j++) {
333                 /* we dont need to ourself ourselves */
334                 if (nodemap->nodes[j].pnn == pnn) {
335                         continue;
336                 }
337                 /* dont check nodes that are unavailable */
338                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
339                         continue;
340                 }
341
342                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
343                                          mem_ctx, &remote_dbmap);
344                 if (ret != 0) {
345                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
346                         return -1;
347                 }
348
349                 /* step through all local databases */
350                 for (db=0; db<dbmap->num;db++) {
351                         const char *name;
352
353
354                         for (i=0;i<remote_dbmap->num;i++) {
355                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
356                                         break;
357                                 }
358                         }
359                         /* the remote node already have this database */
360                         if (i!=remote_dbmap->num) {
361                                 continue;
362                         }
363                         /* ok so we need to create this database */
364                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
365                                             mem_ctx, &name);
366                         if (ret != 0) {
367                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
368                                 return -1;
369                         }
370                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
371                                            mem_ctx, name, dbmap->dbs[db].persistent);
372                         if (ret != 0) {
373                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
374                                 return -1;
375                         }
376                 }
377         }
378
379         return 0;
380 }
381
382
383 /*
384   ensure we are attached to any databases that anyone else is attached to
385  */
386 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
387                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
388 {
389         int i, j, db, ret;
390         struct ctdb_dbid_map *remote_dbmap;
391
392         /* verify that we have all database any other node has */
393         for (j=0; j<nodemap->num; j++) {
394                 /* we dont need to ourself ourselves */
395                 if (nodemap->nodes[j].pnn == pnn) {
396                         continue;
397                 }
398                 /* dont check nodes that are unavailable */
399                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
400                         continue;
401                 }
402
403                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
404                                          mem_ctx, &remote_dbmap);
405                 if (ret != 0) {
406                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
407                         return -1;
408                 }
409
410                 /* step through all databases on the remote node */
411                 for (db=0; db<remote_dbmap->num;db++) {
412                         const char *name;
413
414                         for (i=0;i<(*dbmap)->num;i++) {
415                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
416                                         break;
417                                 }
418                         }
419                         /* we already have this db locally */
420                         if (i!=(*dbmap)->num) {
421                                 continue;
422                         }
423                         /* ok so we need to create this database and
424                            rebuild dbmap
425                          */
426                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
427                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
428                         if (ret != 0) {
429                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
430                                           nodemap->nodes[j].pnn));
431                                 return -1;
432                         }
433                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
434                                            remote_dbmap->dbs[db].persistent);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
437                                 return -1;
438                         }
439                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
440                         if (ret != 0) {
441                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
442                                 return -1;
443                         }
444                 }
445         }
446
447         return 0;
448 }
449
450
451 /*
452   pull the remote database contents from one node into the recdb
453  */
454 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
455                                     struct tdb_wrap *recdb, uint32_t dbid)
456 {
457         int ret;
458         TDB_DATA outdata;
459         struct ctdb_marshall_buffer *reply;
460         struct ctdb_rec_data *rec;
461         int i;
462         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
463
464         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
465                                CONTROL_TIMEOUT(), &outdata);
466         if (ret != 0) {
467                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
468                 talloc_free(tmp_ctx);
469                 return -1;
470         }
471
472         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
473
474         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
475                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
476                 talloc_free(tmp_ctx);
477                 return -1;
478         }
479         
480         rec = (struct ctdb_rec_data *)&reply->data[0];
481         
482         for (i=0;
483              i<reply->count;
484              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
485                 TDB_DATA key, data;
486                 struct ctdb_ltdb_header *hdr;
487                 TDB_DATA existing;
488                 
489                 key.dptr = &rec->data[0];
490                 key.dsize = rec->keylen;
491                 data.dptr = &rec->data[key.dsize];
492                 data.dsize = rec->datalen;
493                 
494                 hdr = (struct ctdb_ltdb_header *)data.dptr;
495
496                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
497                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
498                         talloc_free(tmp_ctx);
499                         return -1;
500                 }
501
502                 /* fetch the existing record, if any */
503                 existing = tdb_fetch(recdb->tdb, key);
504                 
505                 if (existing.dptr != NULL) {
506                         struct ctdb_ltdb_header header;
507                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
508                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
509                                          (unsigned)existing.dsize, srcnode));
510                                 free(existing.dptr);
511                                 talloc_free(tmp_ctx);
512                                 return -1;
513                         }
514                         header = *(struct ctdb_ltdb_header *)existing.dptr;
515                         free(existing.dptr);
516                         if (!(header.rsn < hdr->rsn ||
517                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
518                                 continue;
519                         }
520                 }
521                 
522                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
523                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
524                         talloc_free(tmp_ctx);
525                         return -1;                              
526                 }
527         }
528
529         talloc_free(tmp_ctx);
530
531         return 0;
532 }
533
534 /*
535   pull all the remote database contents into the recdb
536  */
537 static int pull_remote_database(struct ctdb_context *ctdb,
538                                 struct ctdb_recoverd *rec, 
539                                 struct ctdb_node_map *nodemap, 
540                                 struct tdb_wrap *recdb, uint32_t dbid)
541 {
542         int j;
543
544         /* pull all records from all other nodes across onto this node
545            (this merges based on rsn)
546         */
547         for (j=0; j<nodemap->num; j++) {
548                 /* dont merge from nodes that are unavailable */
549                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
550                         continue;
551                 }
552                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
553                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
554                                  nodemap->nodes[j].pnn));
555                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
556                         return -1;
557                 }
558         }
559         
560         return 0;
561 }
562
563
564 /*
565   update flags on all active nodes
566  */
567 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
568 {
569         int ret;
570
571         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
572                 if (ret != 0) {
573                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
574                 return -1;
575         }
576
577         return 0;
578 }
579
580 /*
581   ensure all nodes have the same vnnmap we do
582  */
583 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
584                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
585 {
586         int j, ret;
587
588         /* push the new vnn map out to all the nodes */
589         for (j=0; j<nodemap->num; j++) {
590                 /* dont push to nodes that are unavailable */
591                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
592                         continue;
593                 }
594
595                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
596                 if (ret != 0) {
597                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
598                         return -1;
599                 }
600         }
601
602         return 0;
603 }
604
605
606 struct vacuum_info {
607         struct vacuum_info *next, *prev;
608         struct ctdb_recoverd *rec;
609         uint32_t srcnode;
610         struct ctdb_db_context *ctdb_db;
611         struct ctdb_marshall_buffer *recs;
612         struct ctdb_rec_data *r;
613 };
614
615 static void vacuum_fetch_next(struct vacuum_info *v);
616
617 /*
618   called when a vacuum fetch has completed - just free it and do the next one
619  */
620 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
621 {
622         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
623         talloc_free(state);
624         vacuum_fetch_next(v);
625 }
626
627
628 /*
629   process the next element from the vacuum list
630 */
631 static void vacuum_fetch_next(struct vacuum_info *v)
632 {
633         struct ctdb_call call;
634         struct ctdb_rec_data *r;
635
636         while (v->recs->count) {
637                 struct ctdb_client_call_state *state;
638                 TDB_DATA data;
639                 struct ctdb_ltdb_header *hdr;
640
641                 ZERO_STRUCT(call);
642                 call.call_id = CTDB_NULL_FUNC;
643                 call.flags = CTDB_IMMEDIATE_MIGRATION;
644
645                 r = v->r;
646                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
647                 v->recs->count--;
648
649                 call.key.dptr = &r->data[0];
650                 call.key.dsize = r->keylen;
651
652                 /* ensure we don't block this daemon - just skip a record if we can't get
653                    the chainlock */
654                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
655                         continue;
656                 }
657
658                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
659                 if (data.dptr == NULL) {
660                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
661                         continue;
662                 }
663
664                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
665                         free(data.dptr);
666                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
667                         continue;
668                 }
669                 
670                 hdr = (struct ctdb_ltdb_header *)data.dptr;
671                 if (hdr->dmaster == v->rec->ctdb->pnn) {
672                         /* its already local */
673                         free(data.dptr);
674                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
675                         continue;
676                 }
677
678                 free(data.dptr);
679
680                 state = ctdb_call_send(v->ctdb_db, &call);
681                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
682                 if (state == NULL) {
683                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
684                         talloc_free(v);
685                         return;
686                 }
687                 state->async.fn = vacuum_fetch_callback;
688                 state->async.private_data = v;
689                 return;
690         }
691
692         talloc_free(v);
693 }
694
695
696 /*
697   destroy a vacuum info structure
698  */
699 static int vacuum_info_destructor(struct vacuum_info *v)
700 {
701         DLIST_REMOVE(v->rec->vacuum_info, v);
702         return 0;
703 }
704
705
706 /*
707   handler for vacuum fetch
708 */
709 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
710                                  TDB_DATA data, void *private_data)
711 {
712         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
713         struct ctdb_marshall_buffer *recs;
714         int ret, i;
715         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
716         const char *name;
717         struct ctdb_dbid_map *dbmap=NULL;
718         bool persistent = false;
719         struct ctdb_db_context *ctdb_db;
720         struct ctdb_rec_data *r;
721         uint32_t srcnode;
722         struct vacuum_info *v;
723
724         recs = (struct ctdb_marshall_buffer *)data.dptr;
725         r = (struct ctdb_rec_data *)&recs->data[0];
726
727         if (recs->count == 0) {
728                 talloc_free(tmp_ctx);
729                 return;
730         }
731
732         srcnode = r->reqid;
733
734         for (v=rec->vacuum_info;v;v=v->next) {
735                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
736                         /* we're already working on records from this node */
737                         talloc_free(tmp_ctx);
738                         return;
739                 }
740         }
741
742         /* work out if the database is persistent */
743         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
744         if (ret != 0) {
745                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
746                 talloc_free(tmp_ctx);
747                 return;
748         }
749
750         for (i=0;i<dbmap->num;i++) {
751                 if (dbmap->dbs[i].dbid == recs->db_id) {
752                         persistent = dbmap->dbs[i].persistent;
753                         break;
754                 }
755         }
756         if (i == dbmap->num) {
757                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
758                 talloc_free(tmp_ctx);
759                 return;         
760         }
761
762         /* find the name of this database */
763         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
764                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
765                 talloc_free(tmp_ctx);
766                 return;
767         }
768
769         /* attach to it */
770         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
771         if (ctdb_db == NULL) {
772                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
773                 talloc_free(tmp_ctx);
774                 return;
775         }
776
777         v = talloc_zero(rec, struct vacuum_info);
778         if (v == NULL) {
779                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
780                 talloc_free(tmp_ctx);
781                 return;
782         }
783
784         v->rec = rec;
785         v->srcnode = srcnode;
786         v->ctdb_db = ctdb_db;
787         v->recs = talloc_memdup(v, recs, data.dsize);
788         if (v->recs == NULL) {
789                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
790                 talloc_free(v);
791                 talloc_free(tmp_ctx);
792                 return;         
793         }
794         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
795
796         DLIST_ADD(rec->vacuum_info, v);
797
798         talloc_set_destructor(v, vacuum_info_destructor);
799
800         vacuum_fetch_next(v);
801         talloc_free(tmp_ctx);
802 }
803
804
805 /*
806   called when ctdb_wait_timeout should finish
807  */
808 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
809                               struct timeval yt, void *p)
810 {
811         uint32_t *timed_out = (uint32_t *)p;
812         (*timed_out) = 1;
813 }
814
815 /*
816   wait for a given number of seconds
817  */
818 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
819 {
820         uint32_t timed_out = 0;
821         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
822         while (!timed_out) {
823                 event_loop_once(ctdb->ev);
824         }
825 }
826
827 /*
828   called when an election times out (ends)
829  */
830 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
831                                   struct timeval t, void *p)
832 {
833         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
834         rec->election_timeout = NULL;
835
836         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
837 }
838
839
840 /*
841   wait for an election to finish. It finished election_timeout seconds after
842   the last election packet is received
843  */
844 static void ctdb_wait_election(struct ctdb_recoverd *rec)
845 {
846         struct ctdb_context *ctdb = rec->ctdb;
847         while (rec->election_timeout) {
848                 event_loop_once(ctdb->ev);
849         }
850 }
851
852 /*
853   Update our local flags from all remote connected nodes. 
854   This is only run when we are or we belive we are the recovery master
855  */
856 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
857 {
858         int j;
859         struct ctdb_context *ctdb = rec->ctdb;
860         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
861
862         /* get the nodemap for all active remote nodes and verify
863            they are the same as for this node
864          */
865         for (j=0; j<nodemap->num; j++) {
866                 struct ctdb_node_map *remote_nodemap=NULL;
867                 int ret;
868
869                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
870                         continue;
871                 }
872                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
873                         continue;
874                 }
875
876                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
877                                            mem_ctx, &remote_nodemap);
878                 if (ret != 0) {
879                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
880                                   nodemap->nodes[j].pnn));
881                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
882                         talloc_free(mem_ctx);
883                         return MONITOR_FAILED;
884                 }
885                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
886                         /* We should tell our daemon about this so it
887                            updates its flags or else we will log the same 
888                            message again in the next iteration of recovery.
889                            Since we are the recovery master we can just as
890                            well update the flags on all nodes.
891                         */
892                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
893                         if (ret != 0) {
894                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
895                                 return -1;
896                         }
897
898                         /* Update our local copy of the flags in the recovery
899                            daemon.
900                         */
901                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
902                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
903                                  nodemap->nodes[j].flags));
904                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
905                 }
906                 talloc_free(remote_nodemap);
907         }
908         talloc_free(mem_ctx);
909         return MONITOR_OK;
910 }
911
912
913 /* Create a new random generation ip. 
914    The generation id can not be the INVALID_GENERATION id
915 */
916 static uint32_t new_generation(void)
917 {
918         uint32_t generation;
919
920         while (1) {
921                 generation = random();
922
923                 if (generation != INVALID_GENERATION) {
924                         break;
925                 }
926         }
927
928         return generation;
929 }
930
931
932 /*
933   create a temporary working database
934  */
935 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
936 {
937         char *name;
938         struct tdb_wrap *recdb;
939         unsigned tdb_flags;
940
941         /* open up the temporary recovery database */
942         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
943         if (name == NULL) {
944                 return NULL;
945         }
946         unlink(name);
947
948         tdb_flags = TDB_NOLOCK;
949         if (!ctdb->do_setsched) {
950                 tdb_flags |= TDB_NOMMAP;
951         }
952
953         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
954                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
955         if (recdb == NULL) {
956                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
957         }
958
959         talloc_free(name);
960
961         return recdb;
962 }
963
964
965 /* 
966    a traverse function for pulling all relevent records from recdb
967  */
968 struct recdb_data {
969         struct ctdb_context *ctdb;
970         struct ctdb_marshall_buffer *recdata;
971         uint32_t len;
972         bool failed;
973 };
974
975 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
976 {
977         struct recdb_data *params = (struct recdb_data *)p;
978         struct ctdb_rec_data *rec;
979         struct ctdb_ltdb_header *hdr;
980
981         /* skip empty records */
982         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
983                 return 0;
984         }
985
986         /* update the dmaster field to point to us */
987         hdr = (struct ctdb_ltdb_header *)data.dptr;
988         hdr->dmaster = params->ctdb->pnn;
989
990         /* add the record to the blob ready to send to the nodes */
991         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
992         if (rec == NULL) {
993                 params->failed = true;
994                 return -1;
995         }
996         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
997         if (params->recdata == NULL) {
998                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
999                          rec->length + params->len, params->recdata->count));
1000                 params->failed = true;
1001                 return -1;
1002         }
1003         params->recdata->count++;
1004         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1005         params->len += rec->length;
1006         talloc_free(rec);
1007
1008         return 0;
1009 }
1010
1011 /*
1012   push the recdb database out to all nodes
1013  */
1014 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1015                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1016 {
1017         struct recdb_data params;
1018         struct ctdb_marshall_buffer *recdata;
1019         TDB_DATA outdata;
1020         TALLOC_CTX *tmp_ctx;
1021         uint32_t *nodes;
1022
1023         tmp_ctx = talloc_new(ctdb);
1024         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1025
1026         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1027         CTDB_NO_MEMORY(ctdb, recdata);
1028
1029         recdata->db_id = dbid;
1030
1031         params.ctdb = ctdb;
1032         params.recdata = recdata;
1033         params.len = offsetof(struct ctdb_marshall_buffer, data);
1034         params.failed = false;
1035
1036         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1037                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1038                 talloc_free(params.recdata);
1039                 talloc_free(tmp_ctx);
1040                 return -1;
1041         }
1042
1043         if (params.failed) {
1044                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1045                 talloc_free(params.recdata);
1046                 talloc_free(tmp_ctx);
1047                 return -1;              
1048         }
1049
1050         recdata = params.recdata;
1051
1052         outdata.dptr = (void *)recdata;
1053         outdata.dsize = params.len;
1054
1055         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1056         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1057                                         nodes,
1058                                         CONTROL_TIMEOUT(), false, outdata,
1059                                         NULL, NULL,
1060                                         NULL) != 0) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1062                 talloc_free(recdata);
1063                 talloc_free(tmp_ctx);
1064                 return -1;
1065         }
1066
1067         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1068                   dbid, recdata->count));
1069
1070         talloc_free(recdata);
1071         talloc_free(tmp_ctx);
1072
1073         return 0;
1074 }
1075
1076
1077 /*
1078   go through a full recovery on one database 
1079  */
1080 static int recover_database(struct ctdb_recoverd *rec, 
1081                             TALLOC_CTX *mem_ctx,
1082                             uint32_t dbid,
1083                             uint32_t pnn, 
1084                             struct ctdb_node_map *nodemap,
1085                             uint32_t transaction_id)
1086 {
1087         struct tdb_wrap *recdb;
1088         int ret;
1089         struct ctdb_context *ctdb = rec->ctdb;
1090         TDB_DATA data;
1091         struct ctdb_control_wipe_database w;
1092         uint32_t *nodes;
1093
1094         recdb = create_recdb(ctdb, mem_ctx);
1095         if (recdb == NULL) {
1096                 return -1;
1097         }
1098
1099         /* pull all remote databases onto the recdb */
1100         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1101         if (ret != 0) {
1102                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1103                 return -1;
1104         }
1105
1106         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1107
1108         /* wipe all the remote databases. This is safe as we are in a transaction */
1109         w.db_id = dbid;
1110         w.transaction_id = transaction_id;
1111
1112         data.dptr = (void *)&w;
1113         data.dsize = sizeof(w);
1114
1115         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1117                                         nodes,
1118                                         CONTROL_TIMEOUT(), false, data,
1119                                         NULL, NULL,
1120                                         NULL) != 0) {
1121                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1122                 talloc_free(recdb);
1123                 return -1;
1124         }
1125         
1126         /* push out the correct database. This sets the dmaster and skips 
1127            the empty records */
1128         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1129         if (ret != 0) {
1130                 talloc_free(recdb);
1131                 return -1;
1132         }
1133
1134         /* all done with this database */
1135         talloc_free(recdb);
1136
1137         return 0;
1138 }
1139
1140 /*
1141   reload the nodes file 
1142 */
1143 static void reload_nodes_file(struct ctdb_context *ctdb)
1144 {
1145         ctdb->nodes = NULL;
1146         ctdb_load_nodes_file(ctdb);
1147 }
1148
1149         
1150 /*
1151   we are the recmaster, and recovery is needed - start a recovery run
1152  */
1153 static int do_recovery(struct ctdb_recoverd *rec, 
1154                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1155                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1156 {
1157         struct ctdb_context *ctdb = rec->ctdb;
1158         int i, j, ret;
1159         uint32_t generation;
1160         struct ctdb_dbid_map *dbmap;
1161         TDB_DATA data;
1162         uint32_t *nodes;
1163         struct timeval start_time;
1164
1165         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1166
1167         /* if recovery fails, force it again */
1168         rec->need_recovery = true;
1169
1170         for (i=0; i<ctdb->num_nodes; i++) {
1171                 struct ctdb_banning_state *ban_state;
1172
1173                 if (ctdb->nodes[i]->ban_state == NULL) {
1174                         continue;
1175                 }
1176                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1177                 if (ban_state->count < 2*ctdb->num_nodes) {
1178                         continue;
1179                 }
1180                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1181                         ctdb->nodes[i]->pnn, ban_state->count,
1182                         ctdb->tunable.recovery_ban_period));
1183                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1184                 ban_state->count = 0;
1185         }
1186
1187
1188         if (ctdb->tunable.verify_recovery_lock != 0) {
1189                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1190                 start_time = timeval_current();
1191                 if (!ctdb_recovery_lock(ctdb, true)) {
1192                         ctdb_set_culprit(rec, pnn);
1193                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1194                         return -1;
1195                 }
1196                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1197                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1198         }
1199
1200         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1201
1202         /* get a list of all databases */
1203         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1204         if (ret != 0) {
1205                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1206                 return -1;
1207         }
1208
1209         /* we do the db creation before we set the recovery mode, so the freeze happens
1210            on all databases we will be dealing with. */
1211
1212         /* verify that we have all the databases any other node has */
1213         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1214         if (ret != 0) {
1215                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1216                 return -1;
1217         }
1218
1219         /* verify that all other nodes have all our databases */
1220         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1221         if (ret != 0) {
1222                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1223                 return -1;
1224         }
1225
1226         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1227
1228
1229         /* set recovery mode to active on all nodes */
1230         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1231         if (ret != 0) {
1232                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1233                 return -1;
1234         }
1235
1236         /* execute the "startrecovery" event script on all nodes */
1237         ret = run_startrecovery_eventscript(rec, nodemap);
1238         if (ret!=0) {
1239                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1240                 return -1;
1241         }
1242
1243         /* pick a new generation number */
1244         generation = new_generation();
1245
1246         /* change the vnnmap on this node to use the new generation 
1247            number but not on any other nodes.
1248            this guarantees that if we abort the recovery prematurely
1249            for some reason (a node stops responding?)
1250            that we can just return immediately and we will reenter
1251            recovery shortly again.
1252            I.e. we deliberately leave the cluster with an inconsistent
1253            generation id to allow us to abort recovery at any stage and
1254            just restart it from scratch.
1255          */
1256         vnnmap->generation = generation;
1257         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1258         if (ret != 0) {
1259                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1260                 return -1;
1261         }
1262
1263         data.dptr = (void *)&generation;
1264         data.dsize = sizeof(uint32_t);
1265
1266         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1267         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1268                                         nodes,
1269                                         CONTROL_TIMEOUT(), false, data,
1270                                         NULL, NULL,
1271                                         NULL) != 0) {
1272                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1273                 return -1;
1274         }
1275
1276         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1277
1278         for (i=0;i<dbmap->num;i++) {
1279                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1280                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1281                         return -1;
1282                 }
1283         }
1284
1285         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1286
1287         /* commit all the changes */
1288         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1289                                         nodes,
1290                                         CONTROL_TIMEOUT(), false, data,
1291                                         NULL, NULL,
1292                                         NULL) != 0) {
1293                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1294                 return -1;
1295         }
1296
1297         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1298         
1299
1300         /* update the capabilities for all nodes */
1301         ret = update_capabilities(ctdb, nodemap);
1302         if (ret!=0) {
1303                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1304                 return -1;
1305         }
1306
1307         /* build a new vnn map with all the currently active and
1308            unbanned nodes */
1309         generation = new_generation();
1310         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1311         CTDB_NO_MEMORY(ctdb, vnnmap);
1312         vnnmap->generation = generation;
1313         vnnmap->size = 0;
1314         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1315         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1316         for (i=j=0;i<nodemap->num;i++) {
1317                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1318                         continue;
1319                 }
1320                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1321                         /* this node can not be an lmaster */
1322                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1323                         continue;
1324                 }
1325
1326                 vnnmap->size++;
1327                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1328                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1329                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1330
1331         }
1332         if (vnnmap->size == 0) {
1333                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1334                 vnnmap->size++;
1335                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1336                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1337                 vnnmap->map[0] = pnn;
1338         }       
1339
1340         /* update to the new vnnmap on all nodes */
1341         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1342         if (ret != 0) {
1343                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1344                 return -1;
1345         }
1346
1347         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1348
1349         /* update recmaster to point to us for all nodes */
1350         ret = set_recovery_master(ctdb, nodemap, pnn);
1351         if (ret!=0) {
1352                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1353                 return -1;
1354         }
1355
1356         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1357
1358         /*
1359           update all nodes to have the same flags that we have
1360          */
1361         for (i=0;i<nodemap->num;i++) {
1362                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1363                         continue;
1364                 }
1365
1366                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1367                 if (ret != 0) {
1368                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1369                         return -1;
1370                 }
1371         }
1372
1373         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1374
1375         /* disable recovery mode */
1376         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1377         if (ret != 0) {
1378                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1379                 return -1;
1380         }
1381
1382         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1383
1384         /*
1385           tell nodes to takeover their public IPs
1386          */
1387         rec->need_takeover_run = false;
1388         ret = ctdb_takeover_run(ctdb, nodemap);
1389         if (ret != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1391                 return -1;
1392         }
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1394
1395         /* execute the "recovered" event script on all nodes */
1396         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1397         if (ret!=0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1399                 return -1;
1400         }
1401
1402         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1403
1404         /* send a message to all clients telling them that the cluster 
1405            has been reconfigured */
1406         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1407
1408         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1409
1410         rec->need_recovery = false;
1411
1412         /* we managed to complete a full recovery, make sure to forgive
1413            any past sins by the nodes that could now participate in the
1414            recovery.
1415         */
1416         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1417         for (i=0;i<nodemap->num;i++) {
1418                 struct ctdb_banning_state *ban_state;
1419
1420                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1421                         continue;
1422                 }
1423
1424                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1425                 if (ban_state == NULL) {
1426                         continue;
1427                 }
1428
1429                 ban_state->count = 0;
1430         }
1431
1432
1433         /* We just finished a recovery successfully. 
1434            We now wait for rerecovery_timeout before we allow 
1435            another recovery to take place.
1436         */
1437         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1438         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1439         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1440
1441         return 0;
1442 }
1443
1444
1445 /*
1446   elections are won by first checking the number of connected nodes, then
1447   the priority time, then the pnn
1448  */
1449 struct election_message {
1450         uint32_t num_connected;
1451         struct timeval priority_time;
1452         uint32_t pnn;
1453         uint32_t node_flags;
1454 };
1455
1456 /*
1457   form this nodes election data
1458  */
1459 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1460 {
1461         int ret, i;
1462         struct ctdb_node_map *nodemap;
1463         struct ctdb_context *ctdb = rec->ctdb;
1464
1465         ZERO_STRUCTP(em);
1466
1467         em->pnn = rec->ctdb->pnn;
1468         em->priority_time = rec->priority_time;
1469
1470         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1471         if (ret != 0) {
1472                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1473                 return;
1474         }
1475
1476         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1477         em->node_flags = rec->node_flags;
1478
1479         for (i=0;i<nodemap->num;i++) {
1480                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1481                         em->num_connected++;
1482                 }
1483         }
1484
1485         /* we shouldnt try to win this election if we cant be a recmaster */
1486         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1487                 em->num_connected = 0;
1488                 em->priority_time = timeval_current();
1489         }
1490
1491         talloc_free(nodemap);
1492 }
1493
1494 /*
1495   see if the given election data wins
1496  */
1497 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1498 {
1499         struct election_message myem;
1500         int cmp = 0;
1501
1502         ctdb_election_data(rec, &myem);
1503
1504         /* we cant win if we dont have the recmaster capability */
1505         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1506                 return false;
1507         }
1508
1509         /* we cant win if we are banned */
1510         if (rec->node_flags & NODE_FLAGS_BANNED) {
1511                 return false;
1512         }       
1513
1514         /* we cant win if we are stopped */
1515         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1516                 return false;
1517         }       
1518
1519         /* we will automatically win if the other node is banned */
1520         if (em->node_flags & NODE_FLAGS_BANNED) {
1521                 return true;
1522         }
1523
1524         /* we will automatically win if the other node is banned */
1525         if (em->node_flags & NODE_FLAGS_STOPPED) {
1526                 return true;
1527         }
1528
1529         /* try to use the most connected node */
1530         if (cmp == 0) {
1531                 cmp = (int)myem.num_connected - (int)em->num_connected;
1532         }
1533
1534         /* then the longest running node */
1535         if (cmp == 0) {
1536                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1537         }
1538
1539         if (cmp == 0) {
1540                 cmp = (int)myem.pnn - (int)em->pnn;
1541         }
1542
1543         return cmp > 0;
1544 }
1545
1546 /*
1547   send out an election request
1548  */
1549 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1550 {
1551         int ret;
1552         TDB_DATA election_data;
1553         struct election_message emsg;
1554         uint64_t srvid;
1555         struct ctdb_context *ctdb = rec->ctdb;
1556
1557         srvid = CTDB_SRVID_RECOVERY;
1558
1559         ctdb_election_data(rec, &emsg);
1560
1561         election_data.dsize = sizeof(struct election_message);
1562         election_data.dptr  = (unsigned char *)&emsg;
1563
1564
1565         /* send an election message to all active nodes */
1566         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1567         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1568
1569
1570         /* A new node that is already frozen has entered the cluster.
1571            The existing nodes are not frozen and dont need to be frozen
1572            until the election has ended and we start the actual recovery
1573         */
1574         if (update_recmaster == true) {
1575                 /* first we assume we will win the election and set 
1576                    recoverymaster to be ourself on the current node
1577                  */
1578                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1579                 if (ret != 0) {
1580                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1581                         return -1;
1582                 }
1583         }
1584
1585
1586         return 0;
1587 }
1588
1589 /*
1590   this function will unban all nodes in the cluster
1591 */
1592 static void unban_all_nodes(struct ctdb_context *ctdb)
1593 {
1594         int ret, i;
1595         struct ctdb_node_map *nodemap;
1596         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1597         
1598         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1599         if (ret != 0) {
1600                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1601                 return;
1602         }
1603
1604         for (i=0;i<nodemap->num;i++) {
1605                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1606                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1607                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1608                 }
1609         }
1610
1611         talloc_free(tmp_ctx);
1612 }
1613
1614
1615 /*
1616   we think we are winning the election - send a broadcast election request
1617  */
1618 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1619 {
1620         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1621         int ret;
1622
1623         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1624         if (ret != 0) {
1625                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1626         }
1627
1628         talloc_free(rec->send_election_te);
1629         rec->send_election_te = NULL;
1630 }
1631
1632 /*
1633   handler for memory dumps
1634 */
1635 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1636                              TDB_DATA data, void *private_data)
1637 {
1638         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1639         TDB_DATA *dump;
1640         int ret;
1641         struct rd_memdump_reply *rd;
1642
1643         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1644                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1645                 talloc_free(tmp_ctx);
1646                 return;
1647         }
1648         rd = (struct rd_memdump_reply *)data.dptr;
1649
1650         dump = talloc_zero(tmp_ctx, TDB_DATA);
1651         if (dump == NULL) {
1652                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1653                 talloc_free(tmp_ctx);
1654                 return;
1655         }
1656         ret = ctdb_dump_memory(ctdb, dump);
1657         if (ret != 0) {
1658                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1659                 talloc_free(tmp_ctx);
1660                 return;
1661         }
1662
1663 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1664
1665         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1666         if (ret != 0) {
1667                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1668                 talloc_free(tmp_ctx);
1669                 return;
1670         }
1671
1672         talloc_free(tmp_ctx);
1673 }
1674
1675 /*
1676   handler for reload_nodes
1677 */
1678 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1679                              TDB_DATA data, void *private_data)
1680 {
1681         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1682
1683         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1684
1685         reload_nodes_file(rec->ctdb);
1686 }
1687
1688 /*
1689   handler for ip reallocate, just add it to the list of callers and 
1690   handle this later in the monitor_cluster loop so we do not recurse
1691   with other callers to takeover_run()
1692 */
1693 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1694                              TDB_DATA data, void *private_data)
1695 {
1696         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1697         struct ip_reallocate_list *caller;
1698
1699         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1700                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1701                 return;
1702         }
1703
1704         if (rec->ip_reallocate_ctx == NULL) {
1705                 rec->ip_reallocate_ctx = talloc_new(rec);
1706                 CTDB_NO_MEMORY_FATAL(ctdb, caller);
1707         }
1708
1709         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1710         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1711
1712         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1713         caller->next = rec->reallocate_callers;
1714         rec->reallocate_callers = caller;
1715
1716         return;
1717 }
1718
1719 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1720 {
1721         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1722         TDB_DATA result;
1723         int32_t ret;
1724         struct ip_reallocate_list *callers;
1725
1726         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1727         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1728         result.dsize = sizeof(int32_t);
1729         result.dptr  = (uint8_t *)&ret;
1730
1731         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1732                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1733                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1734                 if (ret != 0) {
1735                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1736                 }
1737         }
1738
1739         talloc_free(tmp_ctx);
1740         talloc_free(rec->ip_reallocate_ctx);
1741         rec->ip_reallocate_ctx = NULL;
1742         rec->reallocate_callers = NULL;
1743         
1744 }
1745
1746
1747 /*
1748   handler for recovery master elections
1749 */
1750 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1751                              TDB_DATA data, void *private_data)
1752 {
1753         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1754         int ret;
1755         struct election_message *em = (struct election_message *)data.dptr;
1756         TALLOC_CTX *mem_ctx;
1757
1758         /* we got an election packet - update the timeout for the election */
1759         talloc_free(rec->election_timeout);
1760         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1761                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1762                                                 ctdb_election_timeout, rec);
1763
1764         mem_ctx = talloc_new(ctdb);
1765
1766         /* someone called an election. check their election data
1767            and if we disagree and we would rather be the elected node, 
1768            send a new election message to all other nodes
1769          */
1770         if (ctdb_election_win(rec, em)) {
1771                 if (!rec->send_election_te) {
1772                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1773                                                                 timeval_current_ofs(0, 500000),
1774                                                                 election_send_request, rec);
1775                 }
1776                 talloc_free(mem_ctx);
1777                 /*unban_all_nodes(ctdb);*/
1778                 return;
1779         }
1780         
1781         /* we didn't win */
1782         talloc_free(rec->send_election_te);
1783         rec->send_election_te = NULL;
1784
1785         if (ctdb->tunable.verify_recovery_lock != 0) {
1786                 /* release the recmaster lock */
1787                 if (em->pnn != ctdb->pnn &&
1788                     ctdb->recovery_lock_fd != -1) {
1789                         close(ctdb->recovery_lock_fd);
1790                         ctdb->recovery_lock_fd = -1;
1791                         unban_all_nodes(ctdb);
1792                 }
1793         }
1794
1795         /* ok, let that guy become recmaster then */
1796         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1797         if (ret != 0) {
1798                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1799                 talloc_free(mem_ctx);
1800                 return;
1801         }
1802
1803         talloc_free(mem_ctx);
1804         return;
1805 }
1806
1807
1808 /*
1809   force the start of the election process
1810  */
1811 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1812                            struct ctdb_node_map *nodemap)
1813 {
1814         int ret;
1815         struct ctdb_context *ctdb = rec->ctdb;
1816
1817         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1818
1819         /* set all nodes to recovery mode to stop all internode traffic */
1820         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1821         if (ret != 0) {
1822                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1823                 return;
1824         }
1825
1826         talloc_free(rec->election_timeout);
1827         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1828                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1829                                                 ctdb_election_timeout, rec);
1830
1831         ret = send_election_request(rec, pnn, true);
1832         if (ret!=0) {
1833                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1834                 return;
1835         }
1836
1837         /* wait for a few seconds to collect all responses */
1838         ctdb_wait_election(rec);
1839 }
1840
1841
1842
1843 /*
1844   handler for when a node changes its flags
1845 */
1846 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1847                             TDB_DATA data, void *private_data)
1848 {
1849         int ret;
1850         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1851         struct ctdb_node_map *nodemap=NULL;
1852         TALLOC_CTX *tmp_ctx;
1853         uint32_t changed_flags;
1854         int i;
1855         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1856
1857         if (data.dsize != sizeof(*c)) {
1858                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1859                 return;
1860         }
1861
1862         tmp_ctx = talloc_new(ctdb);
1863         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1864
1865         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1868                 talloc_free(tmp_ctx);
1869                 return;         
1870         }
1871
1872
1873         for (i=0;i<nodemap->num;i++) {
1874                 if (nodemap->nodes[i].pnn == c->pnn) break;
1875         }
1876
1877         if (i == nodemap->num) {
1878                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1879                 talloc_free(tmp_ctx);
1880                 return;
1881         }
1882
1883         changed_flags = c->old_flags ^ c->new_flags;
1884
1885         if (nodemap->nodes[i].flags != c->new_flags) {
1886                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1887         }
1888
1889         nodemap->nodes[i].flags = c->new_flags;
1890
1891         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1892                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1893
1894         if (ret == 0) {
1895                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1896                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1897         }
1898         
1899         if (ret == 0 &&
1900             ctdb->recovery_master == ctdb->pnn &&
1901             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1902                 /* Only do the takeover run if the perm disabled or unhealthy
1903                    flags changed since these will cause an ip failover but not
1904                    a recovery.
1905                    If the node became disconnected or banned this will also
1906                    lead to an ip address failover but that is handled 
1907                    during recovery
1908                 */
1909                 if (changed_flags & NODE_FLAGS_DISABLED) {
1910                         rec->need_takeover_run = true;
1911                 }
1912         }
1913
1914         talloc_free(tmp_ctx);
1915 }
1916
1917 /*
1918   handler for when we need to push out flag changes ot all other nodes
1919 */
1920 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1921                             TDB_DATA data, void *private_data)
1922 {
1923         int ret;
1924         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1925
1926         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
1927         if (ret != 0) {
1928                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1929         }
1930 }
1931
1932
1933 struct verify_recmode_normal_data {
1934         uint32_t count;
1935         enum monitor_result status;
1936 };
1937
1938 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1939 {
1940         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1941
1942
1943         /* one more node has responded with recmode data*/
1944         rmdata->count--;
1945
1946         /* if we failed to get the recmode, then return an error and let
1947            the main loop try again.
1948         */
1949         if (state->state != CTDB_CONTROL_DONE) {
1950                 if (rmdata->status == MONITOR_OK) {
1951                         rmdata->status = MONITOR_FAILED;
1952                 }
1953                 return;
1954         }
1955
1956         /* if we got a response, then the recmode will be stored in the
1957            status field
1958         */
1959         if (state->status != CTDB_RECOVERY_NORMAL) {
1960                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1961                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1962         }
1963
1964         return;
1965 }
1966
1967
1968 /* verify that all nodes are in normal recovery mode */
1969 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1970 {
1971         struct verify_recmode_normal_data *rmdata;
1972         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1973         struct ctdb_client_control_state *state;
1974         enum monitor_result status;
1975         int j;
1976         
1977         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1978         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1979         rmdata->count  = 0;
1980         rmdata->status = MONITOR_OK;
1981
1982         /* loop over all active nodes and send an async getrecmode call to 
1983            them*/
1984         for (j=0; j<nodemap->num; j++) {
1985                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1986                         continue;
1987                 }
1988                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1989                                         CONTROL_TIMEOUT(), 
1990                                         nodemap->nodes[j].pnn);
1991                 if (state == NULL) {
1992                         /* we failed to send the control, treat this as 
1993                            an error and try again next iteration
1994                         */                      
1995                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1996                         talloc_free(mem_ctx);
1997                         return MONITOR_FAILED;
1998                 }
1999
2000                 /* set up the callback functions */
2001                 state->async.fn = verify_recmode_normal_callback;
2002                 state->async.private_data = rmdata;
2003
2004                 /* one more control to wait for to complete */
2005                 rmdata->count++;
2006         }
2007
2008
2009         /* now wait for up to the maximum number of seconds allowed
2010            or until all nodes we expect a response from has replied
2011         */
2012         while (rmdata->count > 0) {
2013                 event_loop_once(ctdb->ev);
2014         }
2015
2016         status = rmdata->status;
2017         talloc_free(mem_ctx);
2018         return status;
2019 }
2020
2021
2022 struct verify_recmaster_data {
2023         struct ctdb_recoverd *rec;
2024         uint32_t count;
2025         uint32_t pnn;
2026         enum monitor_result status;
2027 };
2028
2029 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2030 {
2031         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2032
2033
2034         /* one more node has responded with recmaster data*/
2035         rmdata->count--;
2036
2037         /* if we failed to get the recmaster, then return an error and let
2038            the main loop try again.
2039         */
2040         if (state->state != CTDB_CONTROL_DONE) {
2041                 if (rmdata->status == MONITOR_OK) {
2042                         rmdata->status = MONITOR_FAILED;
2043                 }
2044                 return;
2045         }
2046
2047         /* if we got a response, then the recmaster will be stored in the
2048            status field
2049         */
2050         if (state->status != rmdata->pnn) {
2051                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2052                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2053                 rmdata->status = MONITOR_ELECTION_NEEDED;
2054         }
2055
2056         return;
2057 }
2058
2059
2060 /* verify that all nodes agree that we are the recmaster */
2061 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2062 {
2063         struct ctdb_context *ctdb = rec->ctdb;
2064         struct verify_recmaster_data *rmdata;
2065         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2066         struct ctdb_client_control_state *state;
2067         enum monitor_result status;
2068         int j;
2069         
2070         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2071         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2072         rmdata->rec    = rec;
2073         rmdata->count  = 0;
2074         rmdata->pnn    = pnn;
2075         rmdata->status = MONITOR_OK;
2076
2077         /* loop over all active nodes and send an async getrecmaster call to 
2078            them*/
2079         for (j=0; j<nodemap->num; j++) {
2080                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2081                         continue;
2082                 }
2083                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2084                                         CONTROL_TIMEOUT(),
2085                                         nodemap->nodes[j].pnn);
2086                 if (state == NULL) {
2087                         /* we failed to send the control, treat this as 
2088                            an error and try again next iteration
2089                         */                      
2090                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2091                         talloc_free(mem_ctx);
2092                         return MONITOR_FAILED;
2093                 }
2094
2095                 /* set up the callback functions */
2096                 state->async.fn = verify_recmaster_callback;
2097                 state->async.private_data = rmdata;
2098
2099                 /* one more control to wait for to complete */
2100                 rmdata->count++;
2101         }
2102
2103
2104         /* now wait for up to the maximum number of seconds allowed
2105            or until all nodes we expect a response from has replied
2106         */
2107         while (rmdata->count > 0) {
2108                 event_loop_once(ctdb->ev);
2109         }
2110
2111         status = rmdata->status;
2112         talloc_free(mem_ctx);
2113         return status;
2114 }
2115
2116
2117 /* called to check that the allocation of public ip addresses is ok.
2118 */
2119 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2120 {
2121         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2122         struct ctdb_all_public_ips *ips = NULL;
2123         struct ctdb_uptime *uptime1 = NULL;
2124         struct ctdb_uptime *uptime2 = NULL;
2125         int ret, j;
2126
2127         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2128                                 CTDB_CURRENT_NODE, &uptime1);
2129         if (ret != 0) {
2130                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2131                 talloc_free(mem_ctx);
2132                 return -1;
2133         }
2134
2135         /* read the ip allocation from the local node */
2136         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2139                 talloc_free(mem_ctx);
2140                 return -1;
2141         }
2142
2143         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2144                                 CTDB_CURRENT_NODE, &uptime2);
2145         if (ret != 0) {
2146                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2147                 talloc_free(mem_ctx);
2148                 return -1;
2149         }
2150
2151         /* skip the check if the startrecovery time has changed */
2152         if (timeval_compare(&uptime1->last_recovery_started,
2153                             &uptime2->last_recovery_started) != 0) {
2154                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2155                 talloc_free(mem_ctx);
2156                 return 0;
2157         }
2158
2159         /* skip the check if the endrecovery time has changed */
2160         if (timeval_compare(&uptime1->last_recovery_finished,
2161                             &uptime2->last_recovery_finished) != 0) {
2162                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2163                 talloc_free(mem_ctx);
2164                 return 0;
2165         }
2166
2167         /* skip the check if we have started but not finished recovery */
2168         if (timeval_compare(&uptime1->last_recovery_finished,
2169                             &uptime1->last_recovery_started) != 1) {
2170                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2171                 talloc_free(mem_ctx);
2172
2173                 return 0;
2174         }
2175
2176         /* verify that we have the ip addresses we should have
2177            and we dont have ones we shouldnt have.
2178            if we find an inconsistency we set recmode to
2179            active on the local node and wait for the recmaster
2180            to do a full blown recovery
2181         */
2182         for (j=0; j<ips->num; j++) {
2183                 if (ips->ips[j].pnn == pnn) {
2184                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2185                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2186                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2187                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2188                                 if (ret != 0) {
2189                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2190
2191                                         talloc_free(mem_ctx);
2192                                         return -1;
2193                                 }
2194                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2195                                 if (ret != 0) {
2196                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2197
2198                                         talloc_free(mem_ctx);
2199                                         return -1;
2200                                 }
2201                         }
2202                 } else {
2203                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2204                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2205                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2206
2207                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2208                                 if (ret != 0) {
2209                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2210
2211                                         talloc_free(mem_ctx);
2212                                         return -1;
2213                                 }
2214                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2215                                 if (ret != 0) {
2216                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2217
2218                                         talloc_free(mem_ctx);
2219                                         return -1;
2220                                 }
2221                         }
2222                 }
2223         }
2224
2225         talloc_free(mem_ctx);
2226         return 0;
2227 }
2228
2229
2230 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2231 {
2232         struct ctdb_node_map **remote_nodemaps = callback_data;
2233
2234         if (node_pnn >= ctdb->num_nodes) {
2235                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2236                 return;
2237         }
2238
2239         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2240
2241 }
2242
2243 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2244         struct ctdb_node_map *nodemap,
2245         struct ctdb_node_map **remote_nodemaps)
2246 {
2247         uint32_t *nodes;
2248
2249         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2250         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2251                                         nodes,
2252                                         CONTROL_TIMEOUT(), false, tdb_null,
2253                                         async_getnodemap_callback,
2254                                         NULL,
2255                                         remote_nodemaps) != 0) {
2256                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2257
2258                 return -1;
2259         }
2260
2261         return 0;
2262 }
2263
2264 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2265 struct ctdb_check_reclock_state {
2266         struct ctdb_context *ctdb;
2267         struct timeval start_time;
2268         int fd[2];
2269         pid_t child;
2270         struct timed_event *te;
2271         struct fd_event *fde;
2272         enum reclock_child_status status;
2273 };
2274
2275 /* when we free the reclock state we must kill any child process.
2276 */
2277 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2278 {
2279         struct ctdb_context *ctdb = state->ctdb;
2280
2281         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2282
2283         if (state->fd[0] != -1) {
2284                 close(state->fd[0]);
2285                 state->fd[0] = -1;
2286         }
2287         if (state->fd[1] != -1) {
2288                 close(state->fd[1]);
2289                 state->fd[1] = -1;
2290         }
2291         kill(state->child, SIGKILL);
2292         return 0;
2293 }
2294
2295 /*
2296   called if our check_reclock child times out. this would happen if
2297   i/o to the reclock file blocks.
2298  */
2299 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2300                                          struct timeval t, void *private_data)
2301 {
2302         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2303                                            struct ctdb_check_reclock_state);
2304
2305         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2306         state->status = RECLOCK_TIMEOUT;
2307 }
2308
2309 /* this is called when the child process has completed checking the reclock
2310    file and has written data back to us through the pipe.
2311 */
2312 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2313                              uint16_t flags, void *private_data)
2314 {
2315         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2316                                              struct ctdb_check_reclock_state);
2317         char c = 0;
2318         int ret;
2319
2320         /* we got a response from our child process so we can abort the
2321            timeout.
2322         */
2323         talloc_free(state->te);
2324         state->te = NULL;
2325
2326         ret = read(state->fd[0], &c, 1);
2327         if (ret != 1 || c != RECLOCK_OK) {
2328                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2329                 state->status = RECLOCK_FAILED;
2330
2331                 return;
2332         }
2333
2334         state->status = RECLOCK_OK;
2335         return;
2336 }
2337
2338 static int check_recovery_lock(struct ctdb_context *ctdb)
2339 {
2340         int ret;
2341         struct ctdb_check_reclock_state *state;
2342         pid_t parent = getpid();
2343
2344         if (ctdb->recovery_lock_fd == -1) {
2345                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2346                 return -1;
2347         }
2348
2349         state = talloc(ctdb, struct ctdb_check_reclock_state);
2350         CTDB_NO_MEMORY(ctdb, state);
2351
2352         state->ctdb = ctdb;
2353         state->start_time = timeval_current();
2354         state->status = RECLOCK_CHECKING;
2355         state->fd[0] = -1;
2356         state->fd[1] = -1;
2357
2358         ret = pipe(state->fd);
2359         if (ret != 0) {
2360                 talloc_free(state);
2361                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2362                 return -1;
2363         }
2364
2365         state->child = fork();
2366         if (state->child == (pid_t)-1) {
2367                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2368                 close(state->fd[0]);
2369                 state->fd[0] = -1;
2370                 close(state->fd[1]);
2371                 state->fd[1] = -1;
2372                 talloc_free(state);
2373                 return -1;
2374         }
2375
2376         if (state->child == 0) {
2377                 char cc = RECLOCK_OK;
2378                 close(state->fd[0]);
2379                 state->fd[0] = -1;
2380
2381                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2382                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2383                         cc = RECLOCK_FAILED;
2384                 }
2385
2386                 write(state->fd[1], &cc, 1);
2387                 /* make sure we die when our parent dies */
2388                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2389                         sleep(5);
2390                         write(state->fd[1], &cc, 1);
2391                 }
2392                 _exit(0);
2393         }
2394         close(state->fd[1]);
2395         state->fd[1] = -1;
2396
2397         talloc_set_destructor(state, check_reclock_destructor);
2398
2399         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2400                                     ctdb_check_reclock_timeout, state);
2401         if (state->te == NULL) {
2402                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2403                 talloc_free(state);
2404                 return -1;
2405         }
2406
2407         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2408                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2409                                 reclock_child_handler,
2410                                 (void *)state);
2411
2412         if (state->fde == NULL) {
2413                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2414                 talloc_free(state);
2415                 return -1;
2416         }
2417
2418         while (state->status == RECLOCK_CHECKING) {
2419                 event_loop_once(ctdb->ev);
2420         }
2421
2422         if (state->status == RECLOCK_FAILED) {
2423                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2424                 close(ctdb->recovery_lock_fd);
2425                 ctdb->recovery_lock_fd = -1;
2426                 talloc_free(state);
2427                 return -1;
2428         }
2429
2430         talloc_free(state);
2431         return 0;
2432 }
2433
2434 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2435 {
2436         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2437         const char *reclockfile;
2438
2439         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2440                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2441                 talloc_free(tmp_ctx);
2442                 return -1;      
2443         }
2444
2445         if (reclockfile == NULL) {
2446                 if (ctdb->recovery_lock_file != NULL) {
2447                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2448                         talloc_free(ctdb->recovery_lock_file);
2449                         ctdb->recovery_lock_file = NULL;
2450                         if (ctdb->recovery_lock_fd != -1) {
2451                                 close(ctdb->recovery_lock_fd);
2452                                 ctdb->recovery_lock_fd = -1;
2453                         }
2454                 }
2455                 ctdb->tunable.verify_recovery_lock = 0;
2456                 talloc_free(tmp_ctx);
2457                 return 0;
2458         }
2459
2460         if (ctdb->recovery_lock_file == NULL) {
2461                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2462                 if (ctdb->recovery_lock_fd != -1) {
2463                         close(ctdb->recovery_lock_fd);
2464                         ctdb->recovery_lock_fd = -1;
2465                 }
2466                 talloc_free(tmp_ctx);
2467                 return 0;
2468         }
2469
2470
2471         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2472                 talloc_free(tmp_ctx);
2473                 return 0;
2474         }
2475
2476         talloc_free(ctdb->recovery_lock_file);
2477         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2478         ctdb->tunable.verify_recovery_lock = 0;
2479         if (ctdb->recovery_lock_fd != -1) {
2480                 close(ctdb->recovery_lock_fd);
2481                 ctdb->recovery_lock_fd = -1;
2482         }
2483
2484         talloc_free(tmp_ctx);
2485         return 0;
2486 }
2487                 
2488 /*
2489   the main monitoring loop
2490  */
2491 static void monitor_cluster(struct ctdb_context *ctdb)
2492 {
2493         uint32_t pnn;
2494         TALLOC_CTX *mem_ctx=NULL;
2495         struct ctdb_node_map *nodemap=NULL;
2496         struct ctdb_node_map *recmaster_nodemap=NULL;
2497         struct ctdb_node_map **remote_nodemaps=NULL;
2498         struct ctdb_vnn_map *vnnmap=NULL;
2499         struct ctdb_vnn_map *remote_vnnmap=NULL;
2500         int32_t debug_level;
2501         int i, j, ret;
2502         struct ctdb_recoverd *rec;
2503
2504         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2505
2506         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2507         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2508
2509         rec->ctdb = ctdb;
2510
2511         rec->priority_time = timeval_current();
2512
2513         /* register a message port for sending memory dumps */
2514         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2515
2516         /* register a message port for recovery elections */
2517         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2518
2519         /* when nodes are disabled/enabled */
2520         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2521
2522         /* when we are asked to puch out a flag change */
2523         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2524
2525         /* register a message port for vacuum fetch */
2526         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2527
2528         /* register a message port for reloadnodes  */
2529         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2530
2531         /* register a message port for performing a takeover run */
2532         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2533
2534 again:
2535         if (mem_ctx) {
2536                 talloc_free(mem_ctx);
2537                 mem_ctx = NULL;
2538         }
2539         mem_ctx = talloc_new(ctdb);
2540         if (!mem_ctx) {
2541                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2542                 exit(-1);
2543         }
2544
2545         /* we only check for recovery once every second */
2546         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2547
2548         /* verify that the main daemon is still running */
2549         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2550                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2551                 exit(-1);
2552         }
2553
2554         /* ping the local daemon to tell it we are alive */
2555         ctdb_ctrl_recd_ping(ctdb);
2556
2557         if (rec->election_timeout) {
2558                 /* an election is in progress */
2559                 goto again;
2560         }
2561
2562         /* read the debug level from the parent and update locally */
2563         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2564         if (ret !=0) {
2565                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2566                 goto again;
2567         }
2568         LogLevel = debug_level;
2569
2570
2571         /* We must check if we need to ban a node here but we want to do this
2572            as early as possible so we dont wait until we have pulled the node
2573            map from the local node. thats why we have the hardcoded value 20
2574         */
2575         for (i=0; i<ctdb->num_nodes; i++) {
2576                 struct ctdb_banning_state *ban_state;
2577
2578                 if (ctdb->nodes[i]->ban_state == NULL) {
2579                         continue;
2580                 }
2581                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2582                 if (ban_state->count < 20) {
2583                         continue;
2584                 }
2585                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2586                         ctdb->nodes[i]->pnn, ban_state->count,
2587                         ctdb->tunable.recovery_ban_period));
2588                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2589                 ban_state->count = 0;
2590         }
2591
2592         /* get relevant tunables */
2593         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2594         if (ret != 0) {
2595                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2596                 goto again;
2597         }
2598
2599         /* get the current recovery lock file from the server */
2600         if (update_recovery_lock_file(ctdb) != 0) {
2601                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2602                 goto again;
2603         }
2604
2605         /* Make sure that if recovery lock verification becomes disabled when
2606            we close the file
2607         */
2608         if (ctdb->tunable.verify_recovery_lock == 0) {
2609                 if (ctdb->recovery_lock_fd != -1) {
2610                         close(ctdb->recovery_lock_fd);
2611                         ctdb->recovery_lock_fd = -1;
2612                 }
2613         }
2614
2615         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2616         if (pnn == (uint32_t)-1) {
2617                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2618                 goto again;
2619         }
2620
2621         /* get the vnnmap */
2622         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2623         if (ret != 0) {
2624                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2625                 goto again;
2626         }
2627
2628
2629         /* get number of nodes */
2630         if (rec->nodemap) {
2631                 talloc_free(rec->nodemap);
2632                 rec->nodemap = NULL;
2633                 nodemap=NULL;
2634         }
2635         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2636         if (ret != 0) {
2637                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2638                 goto again;
2639         }
2640         nodemap = rec->nodemap;
2641
2642         /* check which node is the recovery master */
2643         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2644         if (ret != 0) {
2645                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2646                 goto again;
2647         }
2648
2649         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2650         if (rec->recmaster != pnn) {
2651                 if (rec->ip_reallocate_ctx != NULL) {
2652                         talloc_free(rec->ip_reallocate_ctx);
2653                         rec->ip_reallocate_ctx = NULL;
2654                         rec->reallocate_callers = NULL;
2655                 }
2656         }
2657         /* if there are takeovers requested, perform it and notify the waiters */
2658         if (rec->reallocate_callers) {
2659                 process_ipreallocate_requests(ctdb, rec);
2660         }
2661
2662         if (rec->recmaster == (uint32_t)-1) {
2663                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2664                 force_election(rec, pnn, nodemap);
2665                 goto again;
2666         }
2667
2668
2669         /* if the local daemon is STOPPED, we verify that the databases are
2670            also frozen and thet the recmode is set to active 
2671         */
2672         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2673                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2674                 if (ret != 0) {
2675                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2676                 }
2677                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2678                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2679
2680                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2681                         if (ret != 0) {
2682                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2683                                 goto again;
2684                         }
2685                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2686                         if (ret != 0) {
2687                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2688
2689                                 goto again;
2690                         }
2691                         goto again;
2692                 }
2693         }
2694         /* If the local node is stopped, verify we are not the recmaster 
2695            and yield this role if so
2696         */
2697         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2698                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2699                 force_election(rec, pnn, nodemap);
2700                 goto again;
2701         }
2702         
2703         /* check that we (recovery daemon) and the local ctdb daemon
2704            agrees on whether we are banned or not
2705         */
2706 //qqq
2707
2708         /* remember our own node flags */
2709         rec->node_flags = nodemap->nodes[pnn].flags;
2710
2711         /* count how many active nodes there are */
2712         rec->num_active    = 0;
2713         rec->num_connected = 0;
2714         for (i=0; i<nodemap->num; i++) {
2715                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2716                         rec->num_active++;
2717                 }
2718                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2719                         rec->num_connected++;
2720                 }
2721         }
2722
2723
2724         /* verify that the recmaster node is still active */
2725         for (j=0; j<nodemap->num; j++) {
2726                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2727                         break;
2728                 }
2729         }
2730
2731         if (j == nodemap->num) {
2732                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2733                 force_election(rec, pnn, nodemap);
2734                 goto again;
2735         }
2736
2737         /* if recovery master is disconnected we must elect a new recmaster */
2738         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2739                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2740                 force_election(rec, pnn, nodemap);
2741                 goto again;
2742         }
2743
2744         /* grap the nodemap from the recovery master to check if it is banned */
2745         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2746                                    mem_ctx, &recmaster_nodemap);
2747         if (ret != 0) {
2748                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2749                           nodemap->nodes[j].pnn));
2750                 goto again;
2751         }
2752
2753
2754         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2755                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2756                 force_election(rec, pnn, nodemap);
2757                 goto again;
2758         }
2759
2760
2761         /* verify that we have all ip addresses we should have and we dont
2762          * have addresses we shouldnt have.
2763          */ 
2764         if (ctdb->do_checkpublicip) {
2765                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2766                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2767                         goto again;
2768                 }
2769         }
2770
2771
2772         /* if we are not the recmaster then we do not need to check
2773            if recovery is needed
2774          */
2775         if (pnn != rec->recmaster) {
2776                 goto again;
2777         }
2778
2779
2780         /* ensure our local copies of flags are right */
2781         ret = update_local_flags(rec, nodemap);
2782         if (ret == MONITOR_ELECTION_NEEDED) {
2783                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2784                 force_election(rec, pnn, nodemap);
2785                 goto again;
2786         }
2787         if (ret != MONITOR_OK) {
2788                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2789                 goto again;
2790         }
2791
2792         /* update the list of public ips that a node can handle for
2793            all connected nodes
2794         */
2795         if (ctdb->num_nodes != nodemap->num) {
2796                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2797                 reload_nodes_file(ctdb);
2798                 goto again;
2799         }
2800         for (j=0; j<nodemap->num; j++) {
2801                 /* release any existing data */
2802                 if (ctdb->nodes[j]->public_ips) {
2803                         talloc_free(ctdb->nodes[j]->public_ips);
2804                         ctdb->nodes[j]->public_ips = NULL;
2805                 }
2806
2807                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2808                         continue;
2809                 }
2810
2811                 /* grab a new shiny list of public ips from the node */
2812                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2813                         ctdb->nodes[j]->pnn, 
2814                         ctdb->nodes,
2815                         &ctdb->nodes[j]->public_ips)) {
2816                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2817                                 ctdb->nodes[j]->pnn));
2818                         goto again;
2819                 }
2820         }
2821
2822
2823         /* verify that all active nodes agree that we are the recmaster */
2824         switch (verify_recmaster(rec, nodemap, pnn)) {
2825         case MONITOR_RECOVERY_NEEDED:
2826                 /* can not happen */
2827                 goto again;
2828         case MONITOR_ELECTION_NEEDED:
2829                 force_election(rec, pnn, nodemap);
2830                 goto again;
2831         case MONITOR_OK:
2832                 break;
2833         case MONITOR_FAILED:
2834                 goto again;
2835         }
2836
2837
2838         if (rec->need_recovery) {
2839                 /* a previous recovery didn't finish */
2840                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2841                 goto again;             
2842         }
2843
2844         /* verify that all active nodes are in normal mode 
2845            and not in recovery mode 
2846         */
2847         switch (verify_recmode(ctdb, nodemap)) {
2848         case MONITOR_RECOVERY_NEEDED:
2849                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2850                 goto again;
2851         case MONITOR_FAILED:
2852                 goto again;
2853         case MONITOR_ELECTION_NEEDED:
2854                 /* can not happen */
2855         case MONITOR_OK:
2856                 break;
2857         }
2858
2859
2860         if (ctdb->tunable.verify_recovery_lock != 0) {
2861                 /* we should have the reclock - check its not stale */
2862                 ret = check_recovery_lock(ctdb);
2863                 if (ret != 0) {
2864                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2865                         ctdb_set_culprit(rec, ctdb->pnn);
2866                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2867                         goto again;
2868                 }
2869         }
2870
2871         /* get the nodemap for all active remote nodes
2872          */
2873         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2874         if (remote_nodemaps == NULL) {
2875                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2876                 goto again;
2877         }
2878         for(i=0; i<nodemap->num; i++) {
2879                 remote_nodemaps[i] = NULL;
2880         }
2881         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2882                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2883                 goto again;
2884         } 
2885
2886         /* verify that all other nodes have the same nodemap as we have
2887         */
2888         for (j=0; j<nodemap->num; j++) {
2889                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2890                         continue;
2891                 }
2892
2893                 if (remote_nodemaps[j] == NULL) {
2894                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2895                         ctdb_set_culprit(rec, j);
2896
2897                         goto again;
2898                 }
2899
2900                 /* if the nodes disagree on how many nodes there are
2901                    then this is a good reason to try recovery
2902                  */
2903                 if (remote_nodemaps[j]->num != nodemap->num) {
2904                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2905                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2906                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2907                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2908                         goto again;
2909                 }
2910
2911                 /* if the nodes disagree on which nodes exist and are
2912                    active, then that is also a good reason to do recovery
2913                  */
2914                 for (i=0;i<nodemap->num;i++) {
2915                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2916                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2917                                           nodemap->nodes[j].pnn, i, 
2918                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2919                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2920                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2921                                             vnnmap);
2922                                 goto again;
2923                         }
2924                 }
2925
2926                 /* verify the flags are consistent
2927                 */
2928                 for (i=0; i<nodemap->num; i++) {
2929                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2930                                 continue;
2931                         }
2932                         
2933                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2934                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2935                                   nodemap->nodes[j].pnn, 
2936                                   nodemap->nodes[i].pnn, 
2937                                   remote_nodemaps[j]->nodes[i].flags,
2938                                   nodemap->nodes[j].flags));
2939                                 if (i == j) {
2940                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2941                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2942                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2943                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2944                                                     vnnmap);
2945                                         goto again;
2946                                 } else {
2947                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2948                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2949                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2950                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2951                                                     vnnmap);
2952                                         goto again;
2953                                 }
2954                         }
2955                 }
2956         }
2957
2958
2959         /* there better be the same number of lmasters in the vnn map
2960            as there are active nodes or we will have to do a recovery
2961          */
2962         if (vnnmap->size != rec->num_active) {
2963                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2964                           vnnmap->size, rec->num_active));
2965                 ctdb_set_culprit(rec, ctdb->pnn);
2966                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2967                 goto again;
2968         }
2969
2970         /* verify that all active nodes in the nodemap also exist in 
2971            the vnnmap.
2972          */
2973         for (j=0; j<nodemap->num; j++) {
2974                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2975                         continue;
2976                 }
2977                 if (nodemap->nodes[j].pnn == pnn) {
2978                         continue;
2979                 }
2980
2981                 for (i=0; i<vnnmap->size; i++) {
2982                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2983                                 break;
2984                         }
2985                 }
2986                 if (i == vnnmap->size) {
2987                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2988                                   nodemap->nodes[j].pnn));
2989                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2990                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2991                         goto again;
2992                 }
2993         }
2994
2995         
2996         /* verify that all other nodes have the same vnnmap
2997            and are from the same generation
2998          */
2999         for (j=0; j<nodemap->num; j++) {
3000                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3001                         continue;
3002                 }
3003                 if (nodemap->nodes[j].pnn == pnn) {
3004                         continue;
3005                 }
3006
3007                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3008                                           mem_ctx, &remote_vnnmap);
3009                 if (ret != 0) {
3010                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3011                                   nodemap->nodes[j].pnn));
3012                         goto again;
3013                 }
3014
3015                 /* verify the vnnmap generation is the same */
3016                 if (vnnmap->generation != remote_vnnmap->generation) {
3017                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3018                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3019                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3020                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3021                         goto again;
3022                 }
3023
3024                 /* verify the vnnmap size is the same */
3025                 if (vnnmap->size != remote_vnnmap->size) {
3026                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3027                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3028                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3029                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3030                         goto again;
3031                 }
3032
3033                 /* verify the vnnmap is the same */
3034                 for (i=0;i<vnnmap->size;i++) {
3035                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3036                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3037                                           nodemap->nodes[j].pnn));
3038                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3039                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3040                                             vnnmap);
3041                                 goto again;
3042                         }
3043                 }
3044         }
3045
3046         /* we might need to change who has what IP assigned */
3047         if (rec->need_takeover_run) {
3048                 rec->need_takeover_run = false;
3049
3050                 /* execute the "startrecovery" event script on all nodes */
3051                 ret = run_startrecovery_eventscript(rec, nodemap);
3052                 if (ret!=0) {
3053                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3054                         ctdb_set_culprit(rec, ctdb->pnn);
3055                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3056                 }
3057
3058                 ret = ctdb_takeover_run(ctdb, nodemap);
3059                 if (ret != 0) {
3060                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3061                         ctdb_set_culprit(rec, ctdb->pnn);
3062                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3063                 }
3064
3065                 /* execute the "recovered" event script on all nodes */
3066                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3067 #if 0
3068 // we cant check whether the event completed successfully
3069 // since this script WILL fail if the node is in recovery mode
3070 // and if that race happens, the code here would just cause a second
3071 // cascading recovery.
3072                 if (ret!=0) {
3073                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3074                         ctdb_set_culprit(rec, ctdb->pnn);
3075                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3076                 }
3077 #endif
3078         }
3079
3080
3081         goto again;
3082
3083 }
3084
3085 /*
3086   event handler for when the main ctdbd dies
3087  */
3088 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3089                                  uint16_t flags, void *private_data)
3090 {
3091         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3092         _exit(1);
3093 }
3094
3095 /*
3096   called regularly to verify that the recovery daemon is still running
3097  */
3098 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3099                               struct timeval yt, void *p)
3100 {
3101         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3102
3103         if (kill(ctdb->recoverd_pid, 0) != 0) {
3104                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3105
3106                 ctdb_stop_recoverd(ctdb);
3107                 ctdb_stop_keepalive(ctdb);
3108                 ctdb_stop_monitoring(ctdb);
3109                 ctdb_release_all_ips(ctdb);
3110                 if (ctdb->methods != NULL) {
3111                         ctdb->methods->shutdown(ctdb);
3112                 }
3113                 ctdb_event_script(ctdb, "shutdown");
3114
3115                 exit(10);       
3116         }
3117
3118         event_add_timed(ctdb->ev, ctdb, 
3119                         timeval_current_ofs(30, 0),
3120                         ctdb_check_recd, ctdb);
3121 }
3122
3123 static void recd_sig_child_handler(struct event_context *ev,
3124         struct signal_event *se, int signum, int count,
3125         void *dont_care, 
3126         void *private_data)
3127 {
3128 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3129         int status;
3130         pid_t pid = -1;
3131
3132         while (pid != 0) {
3133                 pid = waitpid(-1, &status, WNOHANG);
3134                 if (pid == -1) {
3135                         if (errno != ECHILD) {
3136                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3137                         }
3138                         return;
3139                 }
3140                 if (pid > 0) {
3141                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3142                 }
3143         }
3144 }
3145
3146 /*
3147   startup the recovery daemon as a child of the main ctdb daemon
3148  */
3149 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3150 {
3151         int fd[2];
3152         struct signal_event *se;
3153
3154         if (pipe(fd) != 0) {
3155                 return -1;
3156         }
3157
3158         ctdb->ctdbd_pid = getpid();
3159
3160         ctdb->recoverd_pid = fork();
3161         if (ctdb->recoverd_pid == -1) {
3162                 return -1;
3163         }
3164         
3165         if (ctdb->recoverd_pid != 0) {
3166                 close(fd[0]);
3167                 event_add_timed(ctdb->ev, ctdb, 
3168                                 timeval_current_ofs(30, 0),
3169                                 ctdb_check_recd, ctdb);
3170                 return 0;
3171         }
3172
3173         close(fd[1]);
3174
3175         srandom(getpid() ^ time(NULL));
3176
3177         if (switch_from_server_to_client(ctdb) != 0) {
3178                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3179                 exit(1);
3180         }
3181
3182         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3183                      ctdb_recoverd_parent, &fd[0]);     
3184
3185         /* set up a handler to pick up sigchld */
3186         se = event_add_signal(ctdb->ev, ctdb,
3187                                      SIGCHLD, 0,
3188                                      recd_sig_child_handler,
3189                                      ctdb);
3190         if (se == NULL) {
3191                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3192                 exit(1);
3193         }
3194
3195         monitor_cluster(ctdb);
3196
3197         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3198         return -1;
3199 }
3200
3201 /*
3202   shutdown the recovery daemon
3203  */
3204 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3205 {
3206         if (ctdb->recoverd_pid == 0) {
3207                 return;
3208         }
3209
3210         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3211         kill(ctdb->recoverd_pid, SIGTERM);
3212 }