dont force an election just because the ban flag differs across the cluster.
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66 };
67
68 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
69 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
70
71
72 /*
73   ban a node for a period of time
74  */
75 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
76 {
77         int ret;
78         struct ctdb_context *ctdb = rec->ctdb;
79         struct ctdb_ban_time bantime;
80        
81         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
82
83         if (!ctdb_validate_pnn(ctdb, pnn)) {
84                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
85                 return;
86         }
87
88         bantime.pnn  = pnn;
89         bantime.time = ban_time;
90
91         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
92         if (ret != 0) {
93                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
94                 return;
95         }
96
97 }
98
99 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
100
101
102 /*
103   run the "recovered" eventscript on all nodes
104  */
105 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
106 {
107         TALLOC_CTX *tmp_ctx;
108         uint32_t *nodes;
109
110         tmp_ctx = talloc_new(ctdb);
111         CTDB_NO_MEMORY(ctdb, tmp_ctx);
112
113         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
114         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
115                                         nodes,
116                                         CONTROL_TIMEOUT(), false, tdb_null,
117                                         NULL, NULL,
118                                         NULL) != 0) {
119                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
120
121                 talloc_free(tmp_ctx);
122                 return -1;
123         }
124
125         talloc_free(tmp_ctx);
126         return 0;
127 }
128
129 /*
130   remember the trouble maker
131  */
132 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
133 {
134         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
135         struct ctdb_banning_state *ban_state;
136
137         if (culprit > ctdb->num_nodes) {
138                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
139                 return;
140         }
141
142         if (ctdb->nodes[culprit]->ban_state == NULL) {
143                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
144                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
145
146                 
147         }
148         ban_state = ctdb->nodes[culprit]->ban_state;
149         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
150                 /* this was the first time in a long while this node
151                    misbehaved so we will forgive any old transgressions.
152                 */
153                 ban_state->count = 0;
154         }
155
156         ban_state->count += count;
157         ban_state->last_reported_time = timeval_current();
158         rec->last_culprit_node = culprit;
159 }
160
161 /*
162   remember the trouble maker
163  */
164 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
165 {
166         ctdb_set_culprit_count(rec, culprit, 1);
167 }
168
169
170 /* this callback is called for every node that failed to execute the
171    start recovery event
172 */
173 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
174 {
175         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
176
177         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
178
179         ctdb_set_culprit(rec, node_pnn);
180 }
181
182 /*
183   run the "startrecovery" eventscript on all nodes
184  */
185 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
186 {
187         TALLOC_CTX *tmp_ctx;
188         uint32_t *nodes;
189         struct ctdb_context *ctdb = rec->ctdb;
190
191         tmp_ctx = talloc_new(ctdb);
192         CTDB_NO_MEMORY(ctdb, tmp_ctx);
193
194         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
195         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
196                                         nodes,
197                                         CONTROL_TIMEOUT(), false, tdb_null,
198                                         NULL,
199                                         startrecovery_fail_callback,
200                                         rec) != 0) {
201                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
202                 talloc_free(tmp_ctx);
203                 return -1;
204         }
205
206         talloc_free(tmp_ctx);
207         return 0;
208 }
209
210 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
211 {
212         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
213                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
214                 return;
215         }
216         if (node_pnn < ctdb->num_nodes) {
217                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
218         }
219 }
220
221 /*
222   update the node capabilities for all connected nodes
223  */
224 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
225 {
226         uint32_t *nodes;
227         TALLOC_CTX *tmp_ctx;
228
229         tmp_ctx = talloc_new(ctdb);
230         CTDB_NO_MEMORY(ctdb, tmp_ctx);
231
232         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
233         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
234                                         nodes, CONTROL_TIMEOUT(),
235                                         false, tdb_null,
236                                         async_getcap_callback, NULL,
237                                         NULL) != 0) {
238                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
239                 talloc_free(tmp_ctx);
240                 return -1;
241         }
242
243         talloc_free(tmp_ctx);
244         return 0;
245 }
246
247 /*
248   change recovery mode on all nodes
249  */
250 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
251 {
252         TDB_DATA data;
253         uint32_t *nodes;
254         TALLOC_CTX *tmp_ctx;
255
256         tmp_ctx = talloc_new(ctdb);
257         CTDB_NO_MEMORY(ctdb, tmp_ctx);
258
259         /* freeze all nodes */
260         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
261         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
262                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
263                                                 nodes, CONTROL_TIMEOUT(),
264                                                 false, tdb_null,
265                                                 NULL, NULL,
266                                                 NULL) != 0) {
267                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
268                         talloc_free(tmp_ctx);
269                         return -1;
270                 }
271         }
272
273
274         data.dsize = sizeof(uint32_t);
275         data.dptr = (unsigned char *)&rec_mode;
276
277         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
278                                         nodes, CONTROL_TIMEOUT(),
279                                         false, data,
280                                         NULL, NULL,
281                                         NULL) != 0) {
282                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
283                 talloc_free(tmp_ctx);
284                 return -1;
285         }
286
287         talloc_free(tmp_ctx);
288         return 0;
289 }
290
291 /*
292   change recovery master on all node
293  */
294 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
295 {
296         TDB_DATA data;
297         TALLOC_CTX *tmp_ctx;
298         uint32_t *nodes;
299
300         tmp_ctx = talloc_new(ctdb);
301         CTDB_NO_MEMORY(ctdb, tmp_ctx);
302
303         data.dsize = sizeof(uint32_t);
304         data.dptr = (unsigned char *)&pnn;
305
306         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
307         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
308                                         nodes,
309                                         CONTROL_TIMEOUT(), false, data,
310                                         NULL, NULL,
311                                         NULL) != 0) {
312                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
313                 talloc_free(tmp_ctx);
314                 return -1;
315         }
316
317         talloc_free(tmp_ctx);
318         return 0;
319 }
320
321
322 /*
323   ensure all other nodes have attached to any databases that we have
324  */
325 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
326                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
327 {
328         int i, j, db, ret;
329         struct ctdb_dbid_map *remote_dbmap;
330
331         /* verify that all other nodes have all our databases */
332         for (j=0; j<nodemap->num; j++) {
333                 /* we dont need to ourself ourselves */
334                 if (nodemap->nodes[j].pnn == pnn) {
335                         continue;
336                 }
337                 /* dont check nodes that are unavailable */
338                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
339                         continue;
340                 }
341
342                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
343                                          mem_ctx, &remote_dbmap);
344                 if (ret != 0) {
345                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
346                         return -1;
347                 }
348
349                 /* step through all local databases */
350                 for (db=0; db<dbmap->num;db++) {
351                         const char *name;
352
353
354                         for (i=0;i<remote_dbmap->num;i++) {
355                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
356                                         break;
357                                 }
358                         }
359                         /* the remote node already have this database */
360                         if (i!=remote_dbmap->num) {
361                                 continue;
362                         }
363                         /* ok so we need to create this database */
364                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
365                                             mem_ctx, &name);
366                         if (ret != 0) {
367                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
368                                 return -1;
369                         }
370                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
371                                            mem_ctx, name, dbmap->dbs[db].persistent);
372                         if (ret != 0) {
373                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
374                                 return -1;
375                         }
376                 }
377         }
378
379         return 0;
380 }
381
382
383 /*
384   ensure we are attached to any databases that anyone else is attached to
385  */
386 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
387                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
388 {
389         int i, j, db, ret;
390         struct ctdb_dbid_map *remote_dbmap;
391
392         /* verify that we have all database any other node has */
393         for (j=0; j<nodemap->num; j++) {
394                 /* we dont need to ourself ourselves */
395                 if (nodemap->nodes[j].pnn == pnn) {
396                         continue;
397                 }
398                 /* dont check nodes that are unavailable */
399                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
400                         continue;
401                 }
402
403                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
404                                          mem_ctx, &remote_dbmap);
405                 if (ret != 0) {
406                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
407                         return -1;
408                 }
409
410                 /* step through all databases on the remote node */
411                 for (db=0; db<remote_dbmap->num;db++) {
412                         const char *name;
413
414                         for (i=0;i<(*dbmap)->num;i++) {
415                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
416                                         break;
417                                 }
418                         }
419                         /* we already have this db locally */
420                         if (i!=(*dbmap)->num) {
421                                 continue;
422                         }
423                         /* ok so we need to create this database and
424                            rebuild dbmap
425                          */
426                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
427                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
428                         if (ret != 0) {
429                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
430                                           nodemap->nodes[j].pnn));
431                                 return -1;
432                         }
433                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
434                                            remote_dbmap->dbs[db].persistent);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
437                                 return -1;
438                         }
439                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
440                         if (ret != 0) {
441                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
442                                 return -1;
443                         }
444                 }
445         }
446
447         return 0;
448 }
449
450
451 /*
452   pull the remote database contents from one node into the recdb
453  */
454 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
455                                     struct tdb_wrap *recdb, uint32_t dbid)
456 {
457         int ret;
458         TDB_DATA outdata;
459         struct ctdb_marshall_buffer *reply;
460         struct ctdb_rec_data *rec;
461         int i;
462         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
463
464         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
465                                CONTROL_TIMEOUT(), &outdata);
466         if (ret != 0) {
467                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
468                 talloc_free(tmp_ctx);
469                 return -1;
470         }
471
472         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
473
474         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
475                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
476                 talloc_free(tmp_ctx);
477                 return -1;
478         }
479         
480         rec = (struct ctdb_rec_data *)&reply->data[0];
481         
482         for (i=0;
483              i<reply->count;
484              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
485                 TDB_DATA key, data;
486                 struct ctdb_ltdb_header *hdr;
487                 TDB_DATA existing;
488                 
489                 key.dptr = &rec->data[0];
490                 key.dsize = rec->keylen;
491                 data.dptr = &rec->data[key.dsize];
492                 data.dsize = rec->datalen;
493                 
494                 hdr = (struct ctdb_ltdb_header *)data.dptr;
495
496                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
497                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
498                         talloc_free(tmp_ctx);
499                         return -1;
500                 }
501
502                 /* fetch the existing record, if any */
503                 existing = tdb_fetch(recdb->tdb, key);
504                 
505                 if (existing.dptr != NULL) {
506                         struct ctdb_ltdb_header header;
507                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
508                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
509                                          (unsigned)existing.dsize, srcnode));
510                                 free(existing.dptr);
511                                 talloc_free(tmp_ctx);
512                                 return -1;
513                         }
514                         header = *(struct ctdb_ltdb_header *)existing.dptr;
515                         free(existing.dptr);
516                         if (!(header.rsn < hdr->rsn ||
517                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
518                                 continue;
519                         }
520                 }
521                 
522                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
523                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
524                         talloc_free(tmp_ctx);
525                         return -1;                              
526                 }
527         }
528
529         talloc_free(tmp_ctx);
530
531         return 0;
532 }
533
534 /*
535   pull all the remote database contents into the recdb
536  */
537 static int pull_remote_database(struct ctdb_context *ctdb,
538                                 struct ctdb_recoverd *rec, 
539                                 struct ctdb_node_map *nodemap, 
540                                 struct tdb_wrap *recdb, uint32_t dbid)
541 {
542         int j;
543
544         /* pull all records from all other nodes across onto this node
545            (this merges based on rsn)
546         */
547         for (j=0; j<nodemap->num; j++) {
548                 /* dont merge from nodes that are unavailable */
549                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
550                         continue;
551                 }
552                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
553                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
554                                  nodemap->nodes[j].pnn));
555                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
556                         return -1;
557                 }
558         }
559         
560         return 0;
561 }
562
563
564 /*
565   update flags on all active nodes
566  */
567 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
568 {
569         int ret;
570
571         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
572                 if (ret != 0) {
573                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
574                 return -1;
575         }
576
577         return 0;
578 }
579
580 /*
581   ensure all nodes have the same vnnmap we do
582  */
583 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
584                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
585 {
586         int j, ret;
587
588         /* push the new vnn map out to all the nodes */
589         for (j=0; j<nodemap->num; j++) {
590                 /* dont push to nodes that are unavailable */
591                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
592                         continue;
593                 }
594
595                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
596                 if (ret != 0) {
597                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
598                         return -1;
599                 }
600         }
601
602         return 0;
603 }
604
605
606 struct vacuum_info {
607         struct vacuum_info *next, *prev;
608         struct ctdb_recoverd *rec;
609         uint32_t srcnode;
610         struct ctdb_db_context *ctdb_db;
611         struct ctdb_marshall_buffer *recs;
612         struct ctdb_rec_data *r;
613 };
614
615 static void vacuum_fetch_next(struct vacuum_info *v);
616
617 /*
618   called when a vacuum fetch has completed - just free it and do the next one
619  */
620 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
621 {
622         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
623         talloc_free(state);
624         vacuum_fetch_next(v);
625 }
626
627
628 /*
629   process the next element from the vacuum list
630 */
631 static void vacuum_fetch_next(struct vacuum_info *v)
632 {
633         struct ctdb_call call;
634         struct ctdb_rec_data *r;
635
636         while (v->recs->count) {
637                 struct ctdb_client_call_state *state;
638                 TDB_DATA data;
639                 struct ctdb_ltdb_header *hdr;
640
641                 ZERO_STRUCT(call);
642                 call.call_id = CTDB_NULL_FUNC;
643                 call.flags = CTDB_IMMEDIATE_MIGRATION;
644
645                 r = v->r;
646                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
647                 v->recs->count--;
648
649                 call.key.dptr = &r->data[0];
650                 call.key.dsize = r->keylen;
651
652                 /* ensure we don't block this daemon - just skip a record if we can't get
653                    the chainlock */
654                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
655                         continue;
656                 }
657
658                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
659                 if (data.dptr == NULL) {
660                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
661                         continue;
662                 }
663
664                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
665                         free(data.dptr);
666                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
667                         continue;
668                 }
669                 
670                 hdr = (struct ctdb_ltdb_header *)data.dptr;
671                 if (hdr->dmaster == v->rec->ctdb->pnn) {
672                         /* its already local */
673                         free(data.dptr);
674                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
675                         continue;
676                 }
677
678                 free(data.dptr);
679
680                 state = ctdb_call_send(v->ctdb_db, &call);
681                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
682                 if (state == NULL) {
683                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
684                         talloc_free(v);
685                         return;
686                 }
687                 state->async.fn = vacuum_fetch_callback;
688                 state->async.private_data = v;
689                 return;
690         }
691
692         talloc_free(v);
693 }
694
695
696 /*
697   destroy a vacuum info structure
698  */
699 static int vacuum_info_destructor(struct vacuum_info *v)
700 {
701         DLIST_REMOVE(v->rec->vacuum_info, v);
702         return 0;
703 }
704
705
706 /*
707   handler for vacuum fetch
708 */
709 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
710                                  TDB_DATA data, void *private_data)
711 {
712         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
713         struct ctdb_marshall_buffer *recs;
714         int ret, i;
715         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
716         const char *name;
717         struct ctdb_dbid_map *dbmap=NULL;
718         bool persistent = false;
719         struct ctdb_db_context *ctdb_db;
720         struct ctdb_rec_data *r;
721         uint32_t srcnode;
722         struct vacuum_info *v;
723
724         recs = (struct ctdb_marshall_buffer *)data.dptr;
725         r = (struct ctdb_rec_data *)&recs->data[0];
726
727         if (recs->count == 0) {
728                 talloc_free(tmp_ctx);
729                 return;
730         }
731
732         srcnode = r->reqid;
733
734         for (v=rec->vacuum_info;v;v=v->next) {
735                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
736                         /* we're already working on records from this node */
737                         talloc_free(tmp_ctx);
738                         return;
739                 }
740         }
741
742         /* work out if the database is persistent */
743         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
744         if (ret != 0) {
745                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
746                 talloc_free(tmp_ctx);
747                 return;
748         }
749
750         for (i=0;i<dbmap->num;i++) {
751                 if (dbmap->dbs[i].dbid == recs->db_id) {
752                         persistent = dbmap->dbs[i].persistent;
753                         break;
754                 }
755         }
756         if (i == dbmap->num) {
757                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
758                 talloc_free(tmp_ctx);
759                 return;         
760         }
761
762         /* find the name of this database */
763         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
764                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
765                 talloc_free(tmp_ctx);
766                 return;
767         }
768
769         /* attach to it */
770         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
771         if (ctdb_db == NULL) {
772                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
773                 talloc_free(tmp_ctx);
774                 return;
775         }
776
777         v = talloc_zero(rec, struct vacuum_info);
778         if (v == NULL) {
779                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
780                 talloc_free(tmp_ctx);
781                 return;
782         }
783
784         v->rec = rec;
785         v->srcnode = srcnode;
786         v->ctdb_db = ctdb_db;
787         v->recs = talloc_memdup(v, recs, data.dsize);
788         if (v->recs == NULL) {
789                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
790                 talloc_free(v);
791                 talloc_free(tmp_ctx);
792                 return;         
793         }
794         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
795
796         DLIST_ADD(rec->vacuum_info, v);
797
798         talloc_set_destructor(v, vacuum_info_destructor);
799
800         vacuum_fetch_next(v);
801         talloc_free(tmp_ctx);
802 }
803
804
805 /*
806   called when ctdb_wait_timeout should finish
807  */
808 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
809                               struct timeval yt, void *p)
810 {
811         uint32_t *timed_out = (uint32_t *)p;
812         (*timed_out) = 1;
813 }
814
815 /*
816   wait for a given number of seconds
817  */
818 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
819 {
820         uint32_t timed_out = 0;
821         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
822         while (!timed_out) {
823                 event_loop_once(ctdb->ev);
824         }
825 }
826
827 /*
828   called when an election times out (ends)
829  */
830 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
831                                   struct timeval t, void *p)
832 {
833         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
834         rec->election_timeout = NULL;
835
836         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
837 }
838
839
840 /*
841   wait for an election to finish. It finished election_timeout seconds after
842   the last election packet is received
843  */
844 static void ctdb_wait_election(struct ctdb_recoverd *rec)
845 {
846         struct ctdb_context *ctdb = rec->ctdb;
847         while (rec->election_timeout) {
848                 event_loop_once(ctdb->ev);
849         }
850 }
851
852 /*
853   Update our local flags from all remote connected nodes. 
854   This is only run when we are or we belive we are the recovery master
855  */
856 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
857 {
858         int j;
859         struct ctdb_context *ctdb = rec->ctdb;
860         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
861
862         /* get the nodemap for all active remote nodes and verify
863            they are the same as for this node
864          */
865         for (j=0; j<nodemap->num; j++) {
866                 struct ctdb_node_map *remote_nodemap=NULL;
867                 int ret;
868
869                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
870                         continue;
871                 }
872                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
873                         continue;
874                 }
875
876                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
877                                            mem_ctx, &remote_nodemap);
878                 if (ret != 0) {
879                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
880                                   nodemap->nodes[j].pnn));
881                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
882                         talloc_free(mem_ctx);
883                         return MONITOR_FAILED;
884                 }
885                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
886                         /* We should tell our daemon about this so it
887                            updates its flags or else we will log the same 
888                            message again in the next iteration of recovery.
889                            Since we are the recovery master we can just as
890                            well update the flags on all nodes.
891                         */
892                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
893                         if (ret != 0) {
894                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
895                                 return -1;
896                         }
897
898                         /* Update our local copy of the flags in the recovery
899                            daemon.
900                         */
901                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
902                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
903                                  nodemap->nodes[j].flags));
904                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
905                 }
906                 talloc_free(remote_nodemap);
907         }
908         talloc_free(mem_ctx);
909         return MONITOR_OK;
910 }
911
912
913 /* Create a new random generation ip. 
914    The generation id can not be the INVALID_GENERATION id
915 */
916 static uint32_t new_generation(void)
917 {
918         uint32_t generation;
919
920         while (1) {
921                 generation = random();
922
923                 if (generation != INVALID_GENERATION) {
924                         break;
925                 }
926         }
927
928         return generation;
929 }
930
931
932 /*
933   create a temporary working database
934  */
935 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
936 {
937         char *name;
938         struct tdb_wrap *recdb;
939         unsigned tdb_flags;
940
941         /* open up the temporary recovery database */
942         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
943         if (name == NULL) {
944                 return NULL;
945         }
946         unlink(name);
947
948         tdb_flags = TDB_NOLOCK;
949         if (!ctdb->do_setsched) {
950                 tdb_flags |= TDB_NOMMAP;
951         }
952
953         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
954                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
955         if (recdb == NULL) {
956                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
957         }
958
959         talloc_free(name);
960
961         return recdb;
962 }
963
964
965 /* 
966    a traverse function for pulling all relevent records from recdb
967  */
968 struct recdb_data {
969         struct ctdb_context *ctdb;
970         struct ctdb_marshall_buffer *recdata;
971         uint32_t len;
972         bool failed;
973 };
974
975 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
976 {
977         struct recdb_data *params = (struct recdb_data *)p;
978         struct ctdb_rec_data *rec;
979         struct ctdb_ltdb_header *hdr;
980
981         /* skip empty records */
982         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
983                 return 0;
984         }
985
986         /* update the dmaster field to point to us */
987         hdr = (struct ctdb_ltdb_header *)data.dptr;
988         hdr->dmaster = params->ctdb->pnn;
989
990         /* add the record to the blob ready to send to the nodes */
991         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
992         if (rec == NULL) {
993                 params->failed = true;
994                 return -1;
995         }
996         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
997         if (params->recdata == NULL) {
998                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
999                          rec->length + params->len, params->recdata->count));
1000                 params->failed = true;
1001                 return -1;
1002         }
1003         params->recdata->count++;
1004         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1005         params->len += rec->length;
1006         talloc_free(rec);
1007
1008         return 0;
1009 }
1010
1011 /*
1012   push the recdb database out to all nodes
1013  */
1014 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1015                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1016 {
1017         struct recdb_data params;
1018         struct ctdb_marshall_buffer *recdata;
1019         TDB_DATA outdata;
1020         TALLOC_CTX *tmp_ctx;
1021         uint32_t *nodes;
1022
1023         tmp_ctx = talloc_new(ctdb);
1024         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1025
1026         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1027         CTDB_NO_MEMORY(ctdb, recdata);
1028
1029         recdata->db_id = dbid;
1030
1031         params.ctdb = ctdb;
1032         params.recdata = recdata;
1033         params.len = offsetof(struct ctdb_marshall_buffer, data);
1034         params.failed = false;
1035
1036         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1037                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1038                 talloc_free(params.recdata);
1039                 talloc_free(tmp_ctx);
1040                 return -1;
1041         }
1042
1043         if (params.failed) {
1044                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1045                 talloc_free(params.recdata);
1046                 talloc_free(tmp_ctx);
1047                 return -1;              
1048         }
1049
1050         recdata = params.recdata;
1051
1052         outdata.dptr = (void *)recdata;
1053         outdata.dsize = params.len;
1054
1055         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1056         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1057                                         nodes,
1058                                         CONTROL_TIMEOUT(), false, outdata,
1059                                         NULL, NULL,
1060                                         NULL) != 0) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1062                 talloc_free(recdata);
1063                 talloc_free(tmp_ctx);
1064                 return -1;
1065         }
1066
1067         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1068                   dbid, recdata->count));
1069
1070         talloc_free(recdata);
1071         talloc_free(tmp_ctx);
1072
1073         return 0;
1074 }
1075
1076
1077 /*
1078   go through a full recovery on one database 
1079  */
1080 static int recover_database(struct ctdb_recoverd *rec, 
1081                             TALLOC_CTX *mem_ctx,
1082                             uint32_t dbid,
1083                             uint32_t pnn, 
1084                             struct ctdb_node_map *nodemap,
1085                             uint32_t transaction_id)
1086 {
1087         struct tdb_wrap *recdb;
1088         int ret;
1089         struct ctdb_context *ctdb = rec->ctdb;
1090         TDB_DATA data;
1091         struct ctdb_control_wipe_database w;
1092         uint32_t *nodes;
1093
1094         recdb = create_recdb(ctdb, mem_ctx);
1095         if (recdb == NULL) {
1096                 return -1;
1097         }
1098
1099         /* pull all remote databases onto the recdb */
1100         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1101         if (ret != 0) {
1102                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1103                 return -1;
1104         }
1105
1106         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1107
1108         /* wipe all the remote databases. This is safe as we are in a transaction */
1109         w.db_id = dbid;
1110         w.transaction_id = transaction_id;
1111
1112         data.dptr = (void *)&w;
1113         data.dsize = sizeof(w);
1114
1115         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1117                                         nodes,
1118                                         CONTROL_TIMEOUT(), false, data,
1119                                         NULL, NULL,
1120                                         NULL) != 0) {
1121                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1122                 talloc_free(recdb);
1123                 return -1;
1124         }
1125         
1126         /* push out the correct database. This sets the dmaster and skips 
1127            the empty records */
1128         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1129         if (ret != 0) {
1130                 talloc_free(recdb);
1131                 return -1;
1132         }
1133
1134         /* all done with this database */
1135         talloc_free(recdb);
1136
1137         return 0;
1138 }
1139
1140 /*
1141   reload the nodes file 
1142 */
1143 static void reload_nodes_file(struct ctdb_context *ctdb)
1144 {
1145         ctdb->nodes = NULL;
1146         ctdb_load_nodes_file(ctdb);
1147 }
1148
1149         
1150 /*
1151   we are the recmaster, and recovery is needed - start a recovery run
1152  */
1153 static int do_recovery(struct ctdb_recoverd *rec, 
1154                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1155                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1156 {
1157         struct ctdb_context *ctdb = rec->ctdb;
1158         int i, j, ret;
1159         uint32_t generation;
1160         struct ctdb_dbid_map *dbmap;
1161         TDB_DATA data;
1162         uint32_t *nodes;
1163         struct timeval start_time;
1164
1165         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1166
1167         /* if recovery fails, force it again */
1168         rec->need_recovery = true;
1169
1170         for (i=0; i<ctdb->num_nodes; i++) {
1171                 struct ctdb_banning_state *ban_state;
1172
1173                 if (ctdb->nodes[i]->ban_state == NULL) {
1174                         continue;
1175                 }
1176                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1177                 if (ban_state->count < 2*ctdb->num_nodes) {
1178                         continue;
1179                 }
1180                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1181                         ctdb->nodes[i]->pnn, ban_state->count,
1182                         ctdb->tunable.recovery_ban_period));
1183                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1184                 ban_state->count = 0;
1185         }
1186
1187
1188         if (ctdb->tunable.verify_recovery_lock != 0) {
1189                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1190                 start_time = timeval_current();
1191                 if (!ctdb_recovery_lock(ctdb, true)) {
1192                         ctdb_set_culprit(rec, pnn);
1193                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1194                         return -1;
1195                 }
1196                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1197                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1198         }
1199
1200         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1201
1202         /* get a list of all databases */
1203         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1204         if (ret != 0) {
1205                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1206                 return -1;
1207         }
1208
1209         /* we do the db creation before we set the recovery mode, so the freeze happens
1210            on all databases we will be dealing with. */
1211
1212         /* verify that we have all the databases any other node has */
1213         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1214         if (ret != 0) {
1215                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1216                 return -1;
1217         }
1218
1219         /* verify that all other nodes have all our databases */
1220         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1221         if (ret != 0) {
1222                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1223                 return -1;
1224         }
1225
1226         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1227
1228
1229         /* set recovery mode to active on all nodes */
1230         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1231         if (ret != 0) {
1232                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1233                 return -1;
1234         }
1235
1236         /* execute the "startrecovery" event script on all nodes */
1237         ret = run_startrecovery_eventscript(rec, nodemap);
1238         if (ret!=0) {
1239                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1240                 return -1;
1241         }
1242
1243         /* pick a new generation number */
1244         generation = new_generation();
1245
1246         /* change the vnnmap on this node to use the new generation 
1247            number but not on any other nodes.
1248            this guarantees that if we abort the recovery prematurely
1249            for some reason (a node stops responding?)
1250            that we can just return immediately and we will reenter
1251            recovery shortly again.
1252            I.e. we deliberately leave the cluster with an inconsistent
1253            generation id to allow us to abort recovery at any stage and
1254            just restart it from scratch.
1255          */
1256         vnnmap->generation = generation;
1257         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1258         if (ret != 0) {
1259                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1260                 return -1;
1261         }
1262
1263         data.dptr = (void *)&generation;
1264         data.dsize = sizeof(uint32_t);
1265
1266         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1267         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1268                                         nodes,
1269                                         CONTROL_TIMEOUT(), false, data,
1270                                         NULL, NULL,
1271                                         NULL) != 0) {
1272                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1273                 return -1;
1274         }
1275
1276         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1277
1278         for (i=0;i<dbmap->num;i++) {
1279                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1280                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1281                         return -1;
1282                 }
1283         }
1284
1285         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1286
1287         /* commit all the changes */
1288         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1289                                         nodes,
1290                                         CONTROL_TIMEOUT(), false, data,
1291                                         NULL, NULL,
1292                                         NULL) != 0) {
1293                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1294                 return -1;
1295         }
1296
1297         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1298         
1299
1300         /* update the capabilities for all nodes */
1301         ret = update_capabilities(ctdb, nodemap);
1302         if (ret!=0) {
1303                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1304                 return -1;
1305         }
1306
1307         /* build a new vnn map with all the currently active and
1308            unbanned nodes */
1309         generation = new_generation();
1310         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1311         CTDB_NO_MEMORY(ctdb, vnnmap);
1312         vnnmap->generation = generation;
1313         vnnmap->size = 0;
1314         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1315         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1316         for (i=j=0;i<nodemap->num;i++) {
1317                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1318                         continue;
1319                 }
1320                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1321                         /* this node can not be an lmaster */
1322                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1323                         continue;
1324                 }
1325
1326                 vnnmap->size++;
1327                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1328                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1329                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1330
1331         }
1332         if (vnnmap->size == 0) {
1333                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1334                 vnnmap->size++;
1335                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1336                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1337                 vnnmap->map[0] = pnn;
1338         }       
1339
1340         /* update to the new vnnmap on all nodes */
1341         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1342         if (ret != 0) {
1343                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1344                 return -1;
1345         }
1346
1347         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1348
1349         /* update recmaster to point to us for all nodes */
1350         ret = set_recovery_master(ctdb, nodemap, pnn);
1351         if (ret!=0) {
1352                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1353                 return -1;
1354         }
1355
1356         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1357
1358         /*
1359           update all nodes to have the same flags that we have
1360          */
1361         for (i=0;i<nodemap->num;i++) {
1362                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1363                         continue;
1364                 }
1365
1366                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1367                 if (ret != 0) {
1368                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1369                         return -1;
1370                 }
1371         }
1372
1373         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1374
1375         /* disable recovery mode */
1376         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1377         if (ret != 0) {
1378                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1379                 return -1;
1380         }
1381
1382         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1383
1384         /*
1385           tell nodes to takeover their public IPs
1386          */
1387         rec->need_takeover_run = false;
1388         ret = ctdb_takeover_run(ctdb, nodemap);
1389         if (ret != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1391                 return -1;
1392         }
1393         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1394
1395         /* execute the "recovered" event script on all nodes */
1396         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1397         if (ret!=0) {
1398                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1399                 return -1;
1400         }
1401
1402         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1403
1404         /* send a message to all clients telling them that the cluster 
1405            has been reconfigured */
1406         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1407
1408         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1409
1410         rec->need_recovery = false;
1411
1412         /* We just finished a recovery successfully. 
1413            We now wait for rerecovery_timeout before we allow 
1414            another recovery to take place.
1415         */
1416         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1417         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1418         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1419
1420         return 0;
1421 }
1422
1423
1424 /*
1425   elections are won by first checking the number of connected nodes, then
1426   the priority time, then the pnn
1427  */
1428 struct election_message {
1429         uint32_t num_connected;
1430         struct timeval priority_time;
1431         uint32_t pnn;
1432         uint32_t node_flags;
1433 };
1434
1435 /*
1436   form this nodes election data
1437  */
1438 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1439 {
1440         int ret, i;
1441         struct ctdb_node_map *nodemap;
1442         struct ctdb_context *ctdb = rec->ctdb;
1443
1444         ZERO_STRUCTP(em);
1445
1446         em->pnn = rec->ctdb->pnn;
1447         em->priority_time = rec->priority_time;
1448
1449         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1450         if (ret != 0) {
1451                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1452                 return;
1453         }
1454
1455         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1456         em->node_flags = rec->node_flags;
1457
1458         for (i=0;i<nodemap->num;i++) {
1459                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1460                         em->num_connected++;
1461                 }
1462         }
1463
1464         /* we shouldnt try to win this election if we cant be a recmaster */
1465         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1466                 em->num_connected = 0;
1467                 em->priority_time = timeval_current();
1468         }
1469
1470         talloc_free(nodemap);
1471 }
1472
1473 /*
1474   see if the given election data wins
1475  */
1476 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1477 {
1478         struct election_message myem;
1479         int cmp = 0;
1480
1481         ctdb_election_data(rec, &myem);
1482
1483         /* we cant win if we dont have the recmaster capability */
1484         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1485                 return false;
1486         }
1487
1488         /* we cant win if we are banned */
1489         if (rec->node_flags & NODE_FLAGS_BANNED) {
1490                 return false;
1491         }       
1492
1493         /* we cant win if we are stopped */
1494         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1495                 return false;
1496         }       
1497
1498         /* we will automatically win if the other node is banned */
1499         if (em->node_flags & NODE_FLAGS_BANNED) {
1500                 return true;
1501         }
1502
1503         /* we will automatically win if the other node is banned */
1504         if (em->node_flags & NODE_FLAGS_STOPPED) {
1505                 return true;
1506         }
1507
1508         /* try to use the most connected node */
1509         if (cmp == 0) {
1510                 cmp = (int)myem.num_connected - (int)em->num_connected;
1511         }
1512
1513         /* then the longest running node */
1514         if (cmp == 0) {
1515                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1516         }
1517
1518         if (cmp == 0) {
1519                 cmp = (int)myem.pnn - (int)em->pnn;
1520         }
1521
1522         return cmp > 0;
1523 }
1524
1525 /*
1526   send out an election request
1527  */
1528 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1529 {
1530         int ret;
1531         TDB_DATA election_data;
1532         struct election_message emsg;
1533         uint64_t srvid;
1534         struct ctdb_context *ctdb = rec->ctdb;
1535
1536         srvid = CTDB_SRVID_RECOVERY;
1537
1538         ctdb_election_data(rec, &emsg);
1539
1540         election_data.dsize = sizeof(struct election_message);
1541         election_data.dptr  = (unsigned char *)&emsg;
1542
1543
1544         /* send an election message to all active nodes */
1545         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1546         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1547
1548
1549         /* A new node that is already frozen has entered the cluster.
1550            The existing nodes are not frozen and dont need to be frozen
1551            until the election has ended and we start the actual recovery
1552         */
1553         if (update_recmaster == true) {
1554                 /* first we assume we will win the election and set 
1555                    recoverymaster to be ourself on the current node
1556                  */
1557                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1558                 if (ret != 0) {
1559                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1560                         return -1;
1561                 }
1562         }
1563
1564
1565         return 0;
1566 }
1567
1568 /*
1569   this function will unban all nodes in the cluster
1570 */
1571 static void unban_all_nodes(struct ctdb_context *ctdb)
1572 {
1573         int ret, i;
1574         struct ctdb_node_map *nodemap;
1575         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1576         
1577         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1580                 return;
1581         }
1582
1583         for (i=0;i<nodemap->num;i++) {
1584                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1585                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1586                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1587                 }
1588         }
1589
1590         talloc_free(tmp_ctx);
1591 }
1592
1593
1594 /*
1595   we think we are winning the election - send a broadcast election request
1596  */
1597 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1598 {
1599         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1600         int ret;
1601
1602         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1603         if (ret != 0) {
1604                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1605         }
1606
1607         talloc_free(rec->send_election_te);
1608         rec->send_election_te = NULL;
1609 }
1610
1611 /*
1612   handler for memory dumps
1613 */
1614 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1615                              TDB_DATA data, void *private_data)
1616 {
1617         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1618         TDB_DATA *dump;
1619         int ret;
1620         struct rd_memdump_reply *rd;
1621
1622         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1623                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1624                 talloc_free(tmp_ctx);
1625                 return;
1626         }
1627         rd = (struct rd_memdump_reply *)data.dptr;
1628
1629         dump = talloc_zero(tmp_ctx, TDB_DATA);
1630         if (dump == NULL) {
1631                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1632                 talloc_free(tmp_ctx);
1633                 return;
1634         }
1635         ret = ctdb_dump_memory(ctdb, dump);
1636         if (ret != 0) {
1637                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1638                 talloc_free(tmp_ctx);
1639                 return;
1640         }
1641
1642 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1643
1644         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1645         if (ret != 0) {
1646                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1647                 talloc_free(tmp_ctx);
1648                 return;
1649         }
1650
1651         talloc_free(tmp_ctx);
1652 }
1653
1654 /*
1655   handler for reload_nodes
1656 */
1657 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1658                              TDB_DATA data, void *private_data)
1659 {
1660         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1661
1662         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1663
1664         reload_nodes_file(rec->ctdb);
1665 }
1666
1667 /*
1668   handler for ip reallocate, just add it to the list of callers and 
1669   handle this later in the monitor_cluster loop so we do not recurse
1670   with other callers to takeover_run()
1671 */
1672 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1673                              TDB_DATA data, void *private_data)
1674 {
1675         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1676         struct ip_reallocate_list *caller;
1677
1678         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1679                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1680                 return;
1681         }
1682
1683         if (rec->ip_reallocate_ctx == NULL) {
1684                 rec->ip_reallocate_ctx = talloc_new(rec);
1685                 CTDB_NO_MEMORY_FATAL(ctdb, caller);
1686         }
1687
1688         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1689         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1690
1691         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1692         caller->next = rec->reallocate_callers;
1693         rec->reallocate_callers = caller;
1694
1695         return;
1696 }
1697
1698 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1699 {
1700         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1701         TDB_DATA result;
1702         int32_t ret;
1703         struct ip_reallocate_list *callers;
1704
1705         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1706         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1707         result.dsize = sizeof(int32_t);
1708         result.dptr  = (uint8_t *)&ret;
1709
1710         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1711                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1712                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1713                 if (ret != 0) {
1714                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1715                 }
1716         }
1717
1718         talloc_free(tmp_ctx);
1719         talloc_free(rec->ip_reallocate_ctx);
1720         rec->ip_reallocate_ctx = NULL;
1721         rec->reallocate_callers = NULL;
1722         
1723 }
1724
1725
1726 /*
1727   handler for recovery master elections
1728 */
1729 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1730                              TDB_DATA data, void *private_data)
1731 {
1732         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1733         int ret;
1734         struct election_message *em = (struct election_message *)data.dptr;
1735         TALLOC_CTX *mem_ctx;
1736
1737         /* we got an election packet - update the timeout for the election */
1738         talloc_free(rec->election_timeout);
1739         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1740                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1741                                                 ctdb_election_timeout, rec);
1742
1743         mem_ctx = talloc_new(ctdb);
1744
1745         /* someone called an election. check their election data
1746            and if we disagree and we would rather be the elected node, 
1747            send a new election message to all other nodes
1748          */
1749         if (ctdb_election_win(rec, em)) {
1750                 if (!rec->send_election_te) {
1751                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1752                                                                 timeval_current_ofs(0, 500000),
1753                                                                 election_send_request, rec);
1754                 }
1755                 talloc_free(mem_ctx);
1756                 /*unban_all_nodes(ctdb);*/
1757                 return;
1758         }
1759         
1760         /* we didn't win */
1761         talloc_free(rec->send_election_te);
1762         rec->send_election_te = NULL;
1763
1764         if (ctdb->tunable.verify_recovery_lock != 0) {
1765                 /* release the recmaster lock */
1766                 if (em->pnn != ctdb->pnn &&
1767                     ctdb->recovery_lock_fd != -1) {
1768                         close(ctdb->recovery_lock_fd);
1769                         ctdb->recovery_lock_fd = -1;
1770                         unban_all_nodes(ctdb);
1771                 }
1772         }
1773
1774         /* ok, let that guy become recmaster then */
1775         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1776         if (ret != 0) {
1777                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1778                 talloc_free(mem_ctx);
1779                 return;
1780         }
1781
1782         talloc_free(mem_ctx);
1783         return;
1784 }
1785
1786
1787 /*
1788   force the start of the election process
1789  */
1790 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1791                            struct ctdb_node_map *nodemap)
1792 {
1793         int ret;
1794         struct ctdb_context *ctdb = rec->ctdb;
1795
1796         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1797
1798         /* set all nodes to recovery mode to stop all internode traffic */
1799         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1800         if (ret != 0) {
1801                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1802                 return;
1803         }
1804
1805         talloc_free(rec->election_timeout);
1806         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1807                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1808                                                 ctdb_election_timeout, rec);
1809
1810         ret = send_election_request(rec, pnn, true);
1811         if (ret!=0) {
1812                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1813                 return;
1814         }
1815
1816         /* wait for a few seconds to collect all responses */
1817         ctdb_wait_election(rec);
1818 }
1819
1820
1821
1822 /*
1823   handler for when a node changes its flags
1824 */
1825 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1826                             TDB_DATA data, void *private_data)
1827 {
1828         int ret;
1829         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1830         struct ctdb_node_map *nodemap=NULL;
1831         TALLOC_CTX *tmp_ctx;
1832         uint32_t changed_flags;
1833         int i;
1834         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1835
1836         if (data.dsize != sizeof(*c)) {
1837                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1838                 return;
1839         }
1840
1841         tmp_ctx = talloc_new(ctdb);
1842         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1843
1844         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1845         if (ret != 0) {
1846                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1847                 talloc_free(tmp_ctx);
1848                 return;         
1849         }
1850
1851
1852         for (i=0;i<nodemap->num;i++) {
1853                 if (nodemap->nodes[i].pnn == c->pnn) break;
1854         }
1855
1856         if (i == nodemap->num) {
1857                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1858                 talloc_free(tmp_ctx);
1859                 return;
1860         }
1861
1862         changed_flags = c->old_flags ^ c->new_flags;
1863
1864         if (nodemap->nodes[i].flags != c->new_flags) {
1865                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1866         }
1867
1868         nodemap->nodes[i].flags = c->new_flags;
1869
1870         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1871                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1872
1873         if (ret == 0) {
1874                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1875                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1876         }
1877         
1878         if (ret == 0 &&
1879             ctdb->recovery_master == ctdb->pnn &&
1880             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1881                 /* Only do the takeover run if the perm disabled or unhealthy
1882                    flags changed since these will cause an ip failover but not
1883                    a recovery.
1884                    If the node became disconnected or banned this will also
1885                    lead to an ip address failover but that is handled 
1886                    during recovery
1887                 */
1888                 if (changed_flags & NODE_FLAGS_DISABLED) {
1889                         rec->need_takeover_run = true;
1890                 }
1891         }
1892
1893         talloc_free(tmp_ctx);
1894 }
1895
1896 /*
1897   handler for when we need to push out flag changes ot all other nodes
1898 */
1899 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1900                             TDB_DATA data, void *private_data)
1901 {
1902         int ret;
1903         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1904
1905         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
1906         if (ret != 0) {
1907                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1908         }
1909 }
1910
1911
1912 struct verify_recmode_normal_data {
1913         uint32_t count;
1914         enum monitor_result status;
1915 };
1916
1917 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1918 {
1919         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1920
1921
1922         /* one more node has responded with recmode data*/
1923         rmdata->count--;
1924
1925         /* if we failed to get the recmode, then return an error and let
1926            the main loop try again.
1927         */
1928         if (state->state != CTDB_CONTROL_DONE) {
1929                 if (rmdata->status == MONITOR_OK) {
1930                         rmdata->status = MONITOR_FAILED;
1931                 }
1932                 return;
1933         }
1934
1935         /* if we got a response, then the recmode will be stored in the
1936            status field
1937         */
1938         if (state->status != CTDB_RECOVERY_NORMAL) {
1939                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1940                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1941         }
1942
1943         return;
1944 }
1945
1946
1947 /* verify that all nodes are in normal recovery mode */
1948 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
1949 {
1950         struct verify_recmode_normal_data *rmdata;
1951         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1952         struct ctdb_client_control_state *state;
1953         enum monitor_result status;
1954         int j;
1955         
1956         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
1957         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
1958         rmdata->count  = 0;
1959         rmdata->status = MONITOR_OK;
1960
1961         /* loop over all active nodes and send an async getrecmode call to 
1962            them*/
1963         for (j=0; j<nodemap->num; j++) {
1964                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1965                         continue;
1966                 }
1967                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
1968                                         CONTROL_TIMEOUT(), 
1969                                         nodemap->nodes[j].pnn);
1970                 if (state == NULL) {
1971                         /* we failed to send the control, treat this as 
1972                            an error and try again next iteration
1973                         */                      
1974                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
1975                         talloc_free(mem_ctx);
1976                         return MONITOR_FAILED;
1977                 }
1978
1979                 /* set up the callback functions */
1980                 state->async.fn = verify_recmode_normal_callback;
1981                 state->async.private_data = rmdata;
1982
1983                 /* one more control to wait for to complete */
1984                 rmdata->count++;
1985         }
1986
1987
1988         /* now wait for up to the maximum number of seconds allowed
1989            or until all nodes we expect a response from has replied
1990         */
1991         while (rmdata->count > 0) {
1992                 event_loop_once(ctdb->ev);
1993         }
1994
1995         status = rmdata->status;
1996         talloc_free(mem_ctx);
1997         return status;
1998 }
1999
2000
2001 struct verify_recmaster_data {
2002         struct ctdb_recoverd *rec;
2003         uint32_t count;
2004         uint32_t pnn;
2005         enum monitor_result status;
2006 };
2007
2008 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2009 {
2010         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2011
2012
2013         /* one more node has responded with recmaster data*/
2014         rmdata->count--;
2015
2016         /* if we failed to get the recmaster, then return an error and let
2017            the main loop try again.
2018         */
2019         if (state->state != CTDB_CONTROL_DONE) {
2020                 if (rmdata->status == MONITOR_OK) {
2021                         rmdata->status = MONITOR_FAILED;
2022                 }
2023                 return;
2024         }
2025
2026         /* if we got a response, then the recmaster will be stored in the
2027            status field
2028         */
2029         if (state->status != rmdata->pnn) {
2030                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2031                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2032                 rmdata->status = MONITOR_ELECTION_NEEDED;
2033         }
2034
2035         return;
2036 }
2037
2038
2039 /* verify that all nodes agree that we are the recmaster */
2040 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2041 {
2042         struct ctdb_context *ctdb = rec->ctdb;
2043         struct verify_recmaster_data *rmdata;
2044         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2045         struct ctdb_client_control_state *state;
2046         enum monitor_result status;
2047         int j;
2048         
2049         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2050         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2051         rmdata->rec    = rec;
2052         rmdata->count  = 0;
2053         rmdata->pnn    = pnn;
2054         rmdata->status = MONITOR_OK;
2055
2056         /* loop over all active nodes and send an async getrecmaster call to 
2057            them*/
2058         for (j=0; j<nodemap->num; j++) {
2059                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2060                         continue;
2061                 }
2062                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2063                                         CONTROL_TIMEOUT(),
2064                                         nodemap->nodes[j].pnn);
2065                 if (state == NULL) {
2066                         /* we failed to send the control, treat this as 
2067                            an error and try again next iteration
2068                         */                      
2069                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2070                         talloc_free(mem_ctx);
2071                         return MONITOR_FAILED;
2072                 }
2073
2074                 /* set up the callback functions */
2075                 state->async.fn = verify_recmaster_callback;
2076                 state->async.private_data = rmdata;
2077
2078                 /* one more control to wait for to complete */
2079                 rmdata->count++;
2080         }
2081
2082
2083         /* now wait for up to the maximum number of seconds allowed
2084            or until all nodes we expect a response from has replied
2085         */
2086         while (rmdata->count > 0) {
2087                 event_loop_once(ctdb->ev);
2088         }
2089
2090         status = rmdata->status;
2091         talloc_free(mem_ctx);
2092         return status;
2093 }
2094
2095
2096 /* called to check that the allocation of public ip addresses is ok.
2097 */
2098 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2099 {
2100         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2101         struct ctdb_all_public_ips *ips = NULL;
2102         struct ctdb_uptime *uptime1 = NULL;
2103         struct ctdb_uptime *uptime2 = NULL;
2104         int ret, j;
2105
2106         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2107                                 CTDB_CURRENT_NODE, &uptime1);
2108         if (ret != 0) {
2109                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2110                 talloc_free(mem_ctx);
2111                 return -1;
2112         }
2113
2114         /* read the ip allocation from the local node */
2115         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2116         if (ret != 0) {
2117                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2118                 talloc_free(mem_ctx);
2119                 return -1;
2120         }
2121
2122         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2123                                 CTDB_CURRENT_NODE, &uptime2);
2124         if (ret != 0) {
2125                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2126                 talloc_free(mem_ctx);
2127                 return -1;
2128         }
2129
2130         /* skip the check if the startrecovery time has changed */
2131         if (timeval_compare(&uptime1->last_recovery_started,
2132                             &uptime2->last_recovery_started) != 0) {
2133                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2134                 talloc_free(mem_ctx);
2135                 return 0;
2136         }
2137
2138         /* skip the check if the endrecovery time has changed */
2139         if (timeval_compare(&uptime1->last_recovery_finished,
2140                             &uptime2->last_recovery_finished) != 0) {
2141                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2142                 talloc_free(mem_ctx);
2143                 return 0;
2144         }
2145
2146         /* skip the check if we have started but not finished recovery */
2147         if (timeval_compare(&uptime1->last_recovery_finished,
2148                             &uptime1->last_recovery_started) != 1) {
2149                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2150                 talloc_free(mem_ctx);
2151
2152                 return 0;
2153         }
2154
2155         /* verify that we have the ip addresses we should have
2156            and we dont have ones we shouldnt have.
2157            if we find an inconsistency we set recmode to
2158            active on the local node and wait for the recmaster
2159            to do a full blown recovery
2160         */
2161         for (j=0; j<ips->num; j++) {
2162                 if (ips->ips[j].pnn == pnn) {
2163                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2164                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2165                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2166                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2167                                 if (ret != 0) {
2168                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2169
2170                                         talloc_free(mem_ctx);
2171                                         return -1;
2172                                 }
2173                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2174                                 if (ret != 0) {
2175                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2176
2177                                         talloc_free(mem_ctx);
2178                                         return -1;
2179                                 }
2180                         }
2181                 } else {
2182                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2183                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2184                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2185
2186                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2187                                 if (ret != 0) {
2188                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2189
2190                                         talloc_free(mem_ctx);
2191                                         return -1;
2192                                 }
2193                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2194                                 if (ret != 0) {
2195                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2196
2197                                         talloc_free(mem_ctx);
2198                                         return -1;
2199                                 }
2200                         }
2201                 }
2202         }
2203
2204         talloc_free(mem_ctx);
2205         return 0;
2206 }
2207
2208
2209 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2210 {
2211         struct ctdb_node_map **remote_nodemaps = callback_data;
2212
2213         if (node_pnn >= ctdb->num_nodes) {
2214                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2215                 return;
2216         }
2217
2218         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2219
2220 }
2221
2222 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2223         struct ctdb_node_map *nodemap,
2224         struct ctdb_node_map **remote_nodemaps)
2225 {
2226         uint32_t *nodes;
2227
2228         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2229         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2230                                         nodes,
2231                                         CONTROL_TIMEOUT(), false, tdb_null,
2232                                         async_getnodemap_callback,
2233                                         NULL,
2234                                         remote_nodemaps) != 0) {
2235                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2236
2237                 return -1;
2238         }
2239
2240         return 0;
2241 }
2242
2243 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2244 struct ctdb_check_reclock_state {
2245         struct ctdb_context *ctdb;
2246         struct timeval start_time;
2247         int fd[2];
2248         pid_t child;
2249         struct timed_event *te;
2250         struct fd_event *fde;
2251         enum reclock_child_status status;
2252 };
2253
2254 /* when we free the reclock state we must kill any child process.
2255 */
2256 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2257 {
2258         struct ctdb_context *ctdb = state->ctdb;
2259
2260         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2261
2262         if (state->fd[0] != -1) {
2263                 close(state->fd[0]);
2264                 state->fd[0] = -1;
2265         }
2266         if (state->fd[1] != -1) {
2267                 close(state->fd[1]);
2268                 state->fd[1] = -1;
2269         }
2270         kill(state->child, SIGKILL);
2271         return 0;
2272 }
2273
2274 /*
2275   called if our check_reclock child times out. this would happen if
2276   i/o to the reclock file blocks.
2277  */
2278 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2279                                          struct timeval t, void *private_data)
2280 {
2281         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2282                                            struct ctdb_check_reclock_state);
2283
2284         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2285         state->status = RECLOCK_TIMEOUT;
2286 }
2287
2288 /* this is called when the child process has completed checking the reclock
2289    file and has written data back to us through the pipe.
2290 */
2291 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2292                              uint16_t flags, void *private_data)
2293 {
2294         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2295                                              struct ctdb_check_reclock_state);
2296         char c = 0;
2297         int ret;
2298
2299         /* we got a response from our child process so we can abort the
2300            timeout.
2301         */
2302         talloc_free(state->te);
2303         state->te = NULL;
2304
2305         ret = read(state->fd[0], &c, 1);
2306         if (ret != 1 || c != RECLOCK_OK) {
2307                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2308                 state->status = RECLOCK_FAILED;
2309
2310                 return;
2311         }
2312
2313         state->status = RECLOCK_OK;
2314         return;
2315 }
2316
2317 static int check_recovery_lock(struct ctdb_context *ctdb)
2318 {
2319         int ret;
2320         struct ctdb_check_reclock_state *state;
2321         pid_t parent = getpid();
2322
2323         if (ctdb->recovery_lock_fd == -1) {
2324                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2325                 return -1;
2326         }
2327
2328         state = talloc(ctdb, struct ctdb_check_reclock_state);
2329         CTDB_NO_MEMORY(ctdb, state);
2330
2331         state->ctdb = ctdb;
2332         state->start_time = timeval_current();
2333         state->status = RECLOCK_CHECKING;
2334         state->fd[0] = -1;
2335         state->fd[1] = -1;
2336
2337         ret = pipe(state->fd);
2338         if (ret != 0) {
2339                 talloc_free(state);
2340                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2341                 return -1;
2342         }
2343
2344         state->child = fork();
2345         if (state->child == (pid_t)-1) {
2346                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2347                 close(state->fd[0]);
2348                 state->fd[0] = -1;
2349                 close(state->fd[1]);
2350                 state->fd[1] = -1;
2351                 talloc_free(state);
2352                 return -1;
2353         }
2354
2355         if (state->child == 0) {
2356                 char cc = RECLOCK_OK;
2357                 close(state->fd[0]);
2358                 state->fd[0] = -1;
2359
2360                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2361                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2362                         cc = RECLOCK_FAILED;
2363                 }
2364
2365                 write(state->fd[1], &cc, 1);
2366                 /* make sure we die when our parent dies */
2367                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2368                         sleep(5);
2369                         write(state->fd[1], &cc, 1);
2370                 }
2371                 _exit(0);
2372         }
2373         close(state->fd[1]);
2374         state->fd[1] = -1;
2375
2376         talloc_set_destructor(state, check_reclock_destructor);
2377
2378         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2379                                     ctdb_check_reclock_timeout, state);
2380         if (state->te == NULL) {
2381                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2382                 talloc_free(state);
2383                 return -1;
2384         }
2385
2386         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2387                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2388                                 reclock_child_handler,
2389                                 (void *)state);
2390
2391         if (state->fde == NULL) {
2392                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2393                 talloc_free(state);
2394                 return -1;
2395         }
2396
2397         while (state->status == RECLOCK_CHECKING) {
2398                 event_loop_once(ctdb->ev);
2399         }
2400
2401         if (state->status == RECLOCK_FAILED) {
2402                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2403                 close(ctdb->recovery_lock_fd);
2404                 ctdb->recovery_lock_fd = -1;
2405                 talloc_free(state);
2406                 return -1;
2407         }
2408
2409         talloc_free(state);
2410         return 0;
2411 }
2412
2413 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2414 {
2415         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2416         const char *reclockfile;
2417
2418         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2419                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2420                 talloc_free(tmp_ctx);
2421                 return -1;      
2422         }
2423
2424         if (reclockfile == NULL) {
2425                 if (ctdb->recovery_lock_file != NULL) {
2426                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2427                         talloc_free(ctdb->recovery_lock_file);
2428                         ctdb->recovery_lock_file = NULL;
2429                         if (ctdb->recovery_lock_fd != -1) {
2430                                 close(ctdb->recovery_lock_fd);
2431                                 ctdb->recovery_lock_fd = -1;
2432                         }
2433                 }
2434                 ctdb->tunable.verify_recovery_lock = 0;
2435                 talloc_free(tmp_ctx);
2436                 return 0;
2437         }
2438
2439         if (ctdb->recovery_lock_file == NULL) {
2440                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2441                 if (ctdb->recovery_lock_fd != -1) {
2442                         close(ctdb->recovery_lock_fd);
2443                         ctdb->recovery_lock_fd = -1;
2444                 }
2445                 talloc_free(tmp_ctx);
2446                 return 0;
2447         }
2448
2449
2450         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2451                 talloc_free(tmp_ctx);
2452                 return 0;
2453         }
2454
2455         talloc_free(ctdb->recovery_lock_file);
2456         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2457         ctdb->tunable.verify_recovery_lock = 0;
2458         if (ctdb->recovery_lock_fd != -1) {
2459                 close(ctdb->recovery_lock_fd);
2460                 ctdb->recovery_lock_fd = -1;
2461         }
2462
2463         talloc_free(tmp_ctx);
2464         return 0;
2465 }
2466                 
2467 /*
2468   the main monitoring loop
2469  */
2470 static void monitor_cluster(struct ctdb_context *ctdb)
2471 {
2472         uint32_t pnn;
2473         TALLOC_CTX *mem_ctx=NULL;
2474         struct ctdb_node_map *nodemap=NULL;
2475         struct ctdb_node_map *recmaster_nodemap=NULL;
2476         struct ctdb_node_map **remote_nodemaps=NULL;
2477         struct ctdb_vnn_map *vnnmap=NULL;
2478         struct ctdb_vnn_map *remote_vnnmap=NULL;
2479         int32_t debug_level;
2480         int i, j, ret;
2481         struct ctdb_recoverd *rec;
2482
2483         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2484
2485         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2486         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2487
2488         rec->ctdb = ctdb;
2489
2490         rec->priority_time = timeval_current();
2491
2492         /* register a message port for sending memory dumps */
2493         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2494
2495         /* register a message port for recovery elections */
2496         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2497
2498         /* when nodes are disabled/enabled */
2499         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2500
2501         /* when we are asked to puch out a flag change */
2502         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2503
2504         /* register a message port for vacuum fetch */
2505         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2506
2507         /* register a message port for reloadnodes  */
2508         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2509
2510         /* register a message port for performing a takeover run */
2511         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2512
2513 again:
2514         if (mem_ctx) {
2515                 talloc_free(mem_ctx);
2516                 mem_ctx = NULL;
2517         }
2518         mem_ctx = talloc_new(ctdb);
2519         if (!mem_ctx) {
2520                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2521                 exit(-1);
2522         }
2523
2524         /* we only check for recovery once every second */
2525         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2526
2527         /* verify that the main daemon is still running */
2528         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2529                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2530                 exit(-1);
2531         }
2532
2533         /* ping the local daemon to tell it we are alive */
2534         ctdb_ctrl_recd_ping(ctdb);
2535
2536         if (rec->election_timeout) {
2537                 /* an election is in progress */
2538                 goto again;
2539         }
2540
2541         /* read the debug level from the parent and update locally */
2542         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2543         if (ret !=0) {
2544                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2545                 goto again;
2546         }
2547         LogLevel = debug_level;
2548
2549
2550         /* We must check if we need to ban a node here but we want to do this
2551            as early as possible so we dont wait until we have pulled the node
2552            map from the local node. thats why we have the hardcoded value 20
2553         */
2554         for (i=0; i<ctdb->num_nodes; i++) {
2555                 struct ctdb_banning_state *ban_state;
2556
2557                 if (ctdb->nodes[i]->ban_state == NULL) {
2558                         continue;
2559                 }
2560                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2561                 if (ban_state->count < 20) {
2562                         continue;
2563                 }
2564                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2565                         ctdb->nodes[i]->pnn, ban_state->count,
2566                         ctdb->tunable.recovery_ban_period));
2567                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2568                 ban_state->count = 0;
2569         }
2570
2571         /* get relevant tunables */
2572         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2573         if (ret != 0) {
2574                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2575                 goto again;
2576         }
2577
2578         /* get the current recovery lock file from the server */
2579         if (update_recovery_lock_file(ctdb) != 0) {
2580                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2581                 goto again;
2582         }
2583
2584         /* Make sure that if recovery lock verification becomes disabled when
2585            we close the file
2586         */
2587         if (ctdb->tunable.verify_recovery_lock == 0) {
2588                 if (ctdb->recovery_lock_fd != -1) {
2589                         close(ctdb->recovery_lock_fd);
2590                         ctdb->recovery_lock_fd = -1;
2591                 }
2592         }
2593
2594         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2595         if (pnn == (uint32_t)-1) {
2596                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2597                 goto again;
2598         }
2599
2600         /* get the vnnmap */
2601         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2602         if (ret != 0) {
2603                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2604                 goto again;
2605         }
2606
2607
2608         /* get number of nodes */
2609         if (rec->nodemap) {
2610                 talloc_free(rec->nodemap);
2611                 rec->nodemap = NULL;
2612                 nodemap=NULL;
2613         }
2614         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2615         if (ret != 0) {
2616                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2617                 goto again;
2618         }
2619         nodemap = rec->nodemap;
2620
2621         /* check which node is the recovery master */
2622         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2623         if (ret != 0) {
2624                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2625                 goto again;
2626         }
2627
2628         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2629         if (rec->recmaster != pnn) {
2630                 if (rec->ip_reallocate_ctx != NULL) {
2631                         talloc_free(rec->ip_reallocate_ctx);
2632                         rec->ip_reallocate_ctx = NULL;
2633                         rec->reallocate_callers = NULL;
2634                 }
2635         }
2636         /* if there are takeovers requested, perform it and notify the waiters */
2637         if (rec->reallocate_callers) {
2638                 process_ipreallocate_requests(ctdb, rec);
2639         }
2640
2641         if (rec->recmaster == (uint32_t)-1) {
2642                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2643                 force_election(rec, pnn, nodemap);
2644                 goto again;
2645         }
2646
2647
2648         /* if the local daemon is STOPPED, we verify that the databases are
2649            also frozen and thet the recmode is set to active 
2650         */
2651         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2652                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2653                 if (ret != 0) {
2654                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2655                 }
2656                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2657                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2658
2659                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2660                         if (ret != 0) {
2661                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2662                                 goto again;
2663                         }
2664                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2665                         if (ret != 0) {
2666                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2667
2668                                 goto again;
2669                         }
2670                         goto again;
2671                 }
2672         }
2673         /* If the local node is stopped, verify we are not the recmaster 
2674            and yield this role if so
2675         */
2676         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2677                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2678                 force_election(rec, pnn, nodemap);
2679                 goto again;
2680         }
2681         
2682         /* check that we (recovery daemon) and the local ctdb daemon
2683            agrees on whether we are banned or not
2684         */
2685 //qqq
2686
2687         /* remember our own node flags */
2688         rec->node_flags = nodemap->nodes[pnn].flags;
2689
2690         /* count how many active nodes there are */
2691         rec->num_active    = 0;
2692         rec->num_connected = 0;
2693         for (i=0; i<nodemap->num; i++) {
2694                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2695                         rec->num_active++;
2696                 }
2697                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2698                         rec->num_connected++;
2699                 }
2700         }
2701
2702
2703         /* verify that the recmaster node is still active */
2704         for (j=0; j<nodemap->num; j++) {
2705                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2706                         break;
2707                 }
2708         }
2709
2710         if (j == nodemap->num) {
2711                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2712                 force_election(rec, pnn, nodemap);
2713                 goto again;
2714         }
2715
2716         /* if recovery master is disconnected we must elect a new recmaster */
2717         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2718                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2719                 force_election(rec, pnn, nodemap);
2720                 goto again;
2721         }
2722
2723         /* grap the nodemap from the recovery master to check if it is banned */
2724         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2725                                    mem_ctx, &recmaster_nodemap);
2726         if (ret != 0) {
2727                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2728                           nodemap->nodes[j].pnn));
2729                 goto again;
2730         }
2731
2732
2733         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2734                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2735                 force_election(rec, pnn, nodemap);
2736                 goto again;
2737         }
2738
2739
2740         /* verify that we have all ip addresses we should have and we dont
2741          * have addresses we shouldnt have.
2742          */ 
2743         if (ctdb->do_checkpublicip) {
2744                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2745                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2746                         goto again;
2747                 }
2748         }
2749
2750
2751         /* if we are not the recmaster then we do not need to check
2752            if recovery is needed
2753          */
2754         if (pnn != rec->recmaster) {
2755                 goto again;
2756         }
2757
2758
2759         /* ensure our local copies of flags are right */
2760         ret = update_local_flags(rec, nodemap);
2761         if (ret == MONITOR_ELECTION_NEEDED) {
2762                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2763                 force_election(rec, pnn, nodemap);
2764                 goto again;
2765         }
2766         if (ret != MONITOR_OK) {
2767                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2768                 goto again;
2769         }
2770
2771         /* update the list of public ips that a node can handle for
2772            all connected nodes
2773         */
2774         if (ctdb->num_nodes != nodemap->num) {
2775                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2776                 reload_nodes_file(ctdb);
2777                 goto again;
2778         }
2779         for (j=0; j<nodemap->num; j++) {
2780                 /* release any existing data */
2781                 if (ctdb->nodes[j]->public_ips) {
2782                         talloc_free(ctdb->nodes[j]->public_ips);
2783                         ctdb->nodes[j]->public_ips = NULL;
2784                 }
2785
2786                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2787                         continue;
2788                 }
2789
2790                 /* grab a new shiny list of public ips from the node */
2791                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2792                         ctdb->nodes[j]->pnn, 
2793                         ctdb->nodes,
2794                         &ctdb->nodes[j]->public_ips)) {
2795                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2796                                 ctdb->nodes[j]->pnn));
2797                         goto again;
2798                 }
2799         }
2800
2801
2802         /* verify that all active nodes agree that we are the recmaster */
2803         switch (verify_recmaster(rec, nodemap, pnn)) {
2804         case MONITOR_RECOVERY_NEEDED:
2805                 /* can not happen */
2806                 goto again;
2807         case MONITOR_ELECTION_NEEDED:
2808                 force_election(rec, pnn, nodemap);
2809                 goto again;
2810         case MONITOR_OK:
2811                 break;
2812         case MONITOR_FAILED:
2813                 goto again;
2814         }
2815
2816
2817         if (rec->need_recovery) {
2818                 /* a previous recovery didn't finish */
2819                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2820                 goto again;             
2821         }
2822
2823         /* verify that all active nodes are in normal mode 
2824            and not in recovery mode 
2825         */
2826         switch (verify_recmode(ctdb, nodemap)) {
2827         case MONITOR_RECOVERY_NEEDED:
2828                 ctdb_set_culprit(rec, ctdb->pnn);
2829                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2830                 goto again;
2831         case MONITOR_FAILED:
2832                 goto again;
2833         case MONITOR_ELECTION_NEEDED:
2834                 /* can not happen */
2835         case MONITOR_OK:
2836                 break;
2837         }
2838
2839
2840         if (ctdb->tunable.verify_recovery_lock != 0) {
2841                 /* we should have the reclock - check its not stale */
2842                 ret = check_recovery_lock(ctdb);
2843                 if (ret != 0) {
2844                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2845                         ctdb_set_culprit(rec, ctdb->pnn);
2846                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2847                         goto again;
2848                 }
2849         }
2850
2851         /* get the nodemap for all active remote nodes
2852          */
2853         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2854         if (remote_nodemaps == NULL) {
2855                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2856                 goto again;
2857         }
2858         for(i=0; i<nodemap->num; i++) {
2859                 remote_nodemaps[i] = NULL;
2860         }
2861         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2862                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2863                 goto again;
2864         } 
2865
2866         /* verify that all other nodes have the same nodemap as we have
2867         */
2868         for (j=0; j<nodemap->num; j++) {
2869                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2870                         continue;
2871                 }
2872
2873                 if (remote_nodemaps[j] == NULL) {
2874                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2875                         ctdb_set_culprit(rec, j);
2876
2877                         goto again;
2878                 }
2879
2880                 /* if the nodes disagree on how many nodes there are
2881                    then this is a good reason to try recovery
2882                  */
2883                 if (remote_nodemaps[j]->num != nodemap->num) {
2884                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2885                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2886                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2887                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2888                         goto again;
2889                 }
2890
2891                 /* if the nodes disagree on which nodes exist and are
2892                    active, then that is also a good reason to do recovery
2893                  */
2894                 for (i=0;i<nodemap->num;i++) {
2895                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2896                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2897                                           nodemap->nodes[j].pnn, i, 
2898                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2899                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2900                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2901                                             vnnmap);
2902                                 goto again;
2903                         }
2904                 }
2905
2906                 /* verify the flags are consistent
2907                 */
2908                 for (i=0; i<nodemap->num; i++) {
2909                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2910                                 continue;
2911                         }
2912                         
2913                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2914                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2915                                   nodemap->nodes[j].pnn, 
2916                                   nodemap->nodes[i].pnn, 
2917                                   remote_nodemaps[j]->nodes[i].flags,
2918                                   nodemap->nodes[j].flags));
2919                                 if (i == j) {
2920                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2921                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2922                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2923                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2924                                                     vnnmap);
2925                                         goto again;
2926                                 } else {
2927                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2928                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2929                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2930                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2931                                                     vnnmap);
2932                                         goto again;
2933                                 }
2934                         }
2935                 }
2936         }
2937
2938
2939         /* there better be the same number of lmasters in the vnn map
2940            as there are active nodes or we will have to do a recovery
2941          */
2942         if (vnnmap->size != rec->num_active) {
2943                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2944                           vnnmap->size, rec->num_active));
2945                 ctdb_set_culprit(rec, ctdb->pnn);
2946                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2947                 goto again;
2948         }
2949
2950         /* verify that all active nodes in the nodemap also exist in 
2951            the vnnmap.
2952          */
2953         for (j=0; j<nodemap->num; j++) {
2954                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2955                         continue;
2956                 }
2957                 if (nodemap->nodes[j].pnn == pnn) {
2958                         continue;
2959                 }
2960
2961                 for (i=0; i<vnnmap->size; i++) {
2962                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2963                                 break;
2964                         }
2965                 }
2966                 if (i == vnnmap->size) {
2967                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2968                                   nodemap->nodes[j].pnn));
2969                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
2970                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2971                         goto again;
2972                 }
2973         }
2974
2975         
2976         /* verify that all other nodes have the same vnnmap
2977            and are from the same generation
2978          */
2979         for (j=0; j<nodemap->num; j++) {
2980                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2981                         continue;
2982                 }
2983                 if (nodemap->nodes[j].pnn == pnn) {
2984                         continue;
2985                 }
2986
2987                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2988                                           mem_ctx, &remote_vnnmap);
2989                 if (ret != 0) {
2990                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2991                                   nodemap->nodes[j].pnn));
2992                         goto again;
2993                 }
2994
2995                 /* verify the vnnmap generation is the same */
2996                 if (vnnmap->generation != remote_vnnmap->generation) {
2997                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2998                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2999                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3000                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3001                         goto again;
3002                 }
3003
3004                 /* verify the vnnmap size is the same */
3005                 if (vnnmap->size != remote_vnnmap->size) {
3006                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3007                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3008                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3009                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3010                         goto again;
3011                 }
3012
3013                 /* verify the vnnmap is the same */
3014                 for (i=0;i<vnnmap->size;i++) {
3015                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3016                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3017                                           nodemap->nodes[j].pnn));
3018                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3019                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3020                                             vnnmap);
3021                                 goto again;
3022                         }
3023                 }
3024         }
3025
3026         /* we might need to change who has what IP assigned */
3027         if (rec->need_takeover_run) {
3028                 rec->need_takeover_run = false;
3029
3030                 /* execute the "startrecovery" event script on all nodes */
3031                 ret = run_startrecovery_eventscript(rec, nodemap);
3032                 if (ret!=0) {
3033                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3034                         ctdb_set_culprit(rec, ctdb->pnn);
3035                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3036                 }
3037
3038                 ret = ctdb_takeover_run(ctdb, nodemap);
3039                 if (ret != 0) {
3040                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3041                         ctdb_set_culprit(rec, ctdb->pnn);
3042                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3043                 }
3044
3045                 /* execute the "recovered" event script on all nodes */
3046                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3047 #if 0
3048 // we cant check whether the event completed successfully
3049 // since this script WILL fail if the node is in recovery mode
3050 // and if that race happens, the code here would just cause a second
3051 // cascading recovery.
3052                 if (ret!=0) {
3053                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3054                         ctdb_set_culprit(rec, ctdb->pnn);
3055                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3056                 }
3057 #endif
3058         }
3059
3060
3061         goto again;
3062
3063 }
3064
3065 /*
3066   event handler for when the main ctdbd dies
3067  */
3068 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3069                                  uint16_t flags, void *private_data)
3070 {
3071         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3072         _exit(1);
3073 }
3074
3075 /*
3076   called regularly to verify that the recovery daemon is still running
3077  */
3078 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3079                               struct timeval yt, void *p)
3080 {
3081         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3082
3083         if (kill(ctdb->recoverd_pid, 0) != 0) {
3084                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3085
3086                 ctdb_stop_recoverd(ctdb);
3087                 ctdb_stop_keepalive(ctdb);
3088                 ctdb_stop_monitoring(ctdb);
3089                 ctdb_release_all_ips(ctdb);
3090                 if (ctdb->methods != NULL) {
3091                         ctdb->methods->shutdown(ctdb);
3092                 }
3093                 ctdb_event_script(ctdb, "shutdown");
3094
3095                 exit(10);       
3096         }
3097
3098         event_add_timed(ctdb->ev, ctdb, 
3099                         timeval_current_ofs(30, 0),
3100                         ctdb_check_recd, ctdb);
3101 }
3102
3103 static void recd_sig_child_handler(struct event_context *ev,
3104         struct signal_event *se, int signum, int count,
3105         void *dont_care, 
3106         void *private_data)
3107 {
3108 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3109         int status;
3110         pid_t pid = -1;
3111
3112         while (pid != 0) {
3113                 pid = waitpid(-1, &status, WNOHANG);
3114                 if (pid == -1) {
3115                         if (errno != ECHILD) {
3116                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3117                         }
3118                         return;
3119                 }
3120                 if (pid > 0) {
3121                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3122                 }
3123         }
3124 }
3125
3126 /*
3127   startup the recovery daemon as a child of the main ctdb daemon
3128  */
3129 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3130 {
3131         int fd[2];
3132         struct signal_event *se;
3133
3134         if (pipe(fd) != 0) {
3135                 return -1;
3136         }
3137
3138         ctdb->ctdbd_pid = getpid();
3139
3140         ctdb->recoverd_pid = fork();
3141         if (ctdb->recoverd_pid == -1) {
3142                 return -1;
3143         }
3144         
3145         if (ctdb->recoverd_pid != 0) {
3146                 close(fd[0]);
3147                 event_add_timed(ctdb->ev, ctdb, 
3148                                 timeval_current_ofs(30, 0),
3149                                 ctdb_check_recd, ctdb);
3150                 return 0;
3151         }
3152
3153         close(fd[1]);
3154
3155         srandom(getpid() ^ time(NULL));
3156
3157         if (switch_from_server_to_client(ctdb) != 0) {
3158                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3159                 exit(1);
3160         }
3161
3162         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3163                      ctdb_recoverd_parent, &fd[0]);     
3164
3165         /* set up a handler to pick up sigchld */
3166         se = event_add_signal(ctdb->ev, ctdb,
3167                                      SIGCHLD, 0,
3168                                      recd_sig_child_handler,
3169                                      ctdb);
3170         if (se == NULL) {
3171                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3172                 exit(1);
3173         }
3174
3175         monitor_cluster(ctdb);
3176
3177         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3178         return -1;
3179 }
3180
3181 /*
3182   shutdown the recovery daemon
3183  */
3184 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3185 {
3186         if (ctdb->recoverd_pid == 0) {
3187                 return;
3188         }
3189
3190         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3191         kill(ctdb->recoverd_pid, SIGTERM);
3192 }