inew version 1.0.66
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         uint32_t recmaster;
45         uint32_t num_active;
46         uint32_t num_connected;
47         struct ctdb_node_map *nodemap;
48         uint32_t last_culprit;
49         uint32_t culprit_counter;
50         struct timeval first_recover_time;
51         struct ban_state **banned_nodes;
52         struct timeval priority_time;
53         bool need_takeover_run;
54         bool need_recovery;
55         uint32_t node_flags;
56         struct timed_event *send_election_te;
57         struct timed_event *election_timeout;
58         struct vacuum_info *vacuum_info;
59 };
60
61 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
62 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
63
64
65 /*
66   unban a node
67  */
68 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
69 {
70         struct ctdb_context *ctdb = rec->ctdb;
71
72         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
73
74         if (!ctdb_validate_pnn(ctdb, pnn)) {
75                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
76                 return;
77         }
78
79         /* If we are unbanning a different node then just pass the ban info on */
80         if (pnn != ctdb->pnn) {
81                 TDB_DATA data;
82                 int ret;
83                 
84                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
85
86                 data.dptr = (uint8_t *)&pnn;
87                 data.dsize = sizeof(uint32_t);
88
89                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
90                 if (ret != 0) {
91                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
92                         return;
93                 }
94
95                 return;
96         }
97
98         /* make sure we remember we are no longer banned in case 
99            there is an election */
100         rec->node_flags &= ~NODE_FLAGS_BANNED;
101
102         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
103         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
104
105         if (rec->banned_nodes[pnn] == NULL) {
106                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
107                 return;
108         }
109
110         talloc_free(rec->banned_nodes[pnn]);
111         rec->banned_nodes[pnn] = NULL;
112 }
113
114
115 /*
116   called when a ban has timed out
117  */
118 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
119 {
120         struct ban_state *state = talloc_get_type(p, struct ban_state);
121         struct ctdb_recoverd *rec = state->rec;
122         uint32_t pnn = state->banned_node;
123
124         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
125         ctdb_unban_node(rec, pnn);
126 }
127
128 /*
129   ban a node for a period of time
130  */
131 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
132 {
133         struct ctdb_context *ctdb = rec->ctdb;
134
135         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
136
137         if (!ctdb_validate_pnn(ctdb, pnn)) {
138                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
139                 return;
140         }
141
142         if (0 == ctdb->tunable.enable_bans) {
143                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
144                 return;
145         }
146
147         /* If we are banning a different node then just pass the ban info on */
148         if (pnn != ctdb->pnn) {
149                 struct ctdb_ban_info b;
150                 TDB_DATA data;
151                 int ret;
152                 
153                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
154
155                 b.pnn = pnn;
156                 b.ban_time = ban_time;
157
158                 data.dptr = (uint8_t *)&b;
159                 data.dsize = sizeof(b);
160
161                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
162                 if (ret != 0) {
163                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
164                         return;
165                 }
166
167                 return;
168         }
169
170         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
171         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
172
173         /* banning ourselves - lower our election priority */
174         rec->priority_time = timeval_current();
175
176         /* make sure we remember we are banned in case there is an 
177            election */
178         rec->node_flags |= NODE_FLAGS_BANNED;
179
180         if (rec->banned_nodes[pnn] != NULL) {
181                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
182                 talloc_free(rec->banned_nodes[pnn]);
183                 rec->banned_nodes[pnn] = NULL;
184         }
185
186         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
187         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
188
189         rec->banned_nodes[pnn]->rec = rec;
190         rec->banned_nodes[pnn]->banned_node = pnn;
191
192         if (ban_time != 0) {
193                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
194                                 timeval_current_ofs(ban_time, 0),
195                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
196         }
197 }
198
199 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
200
201
202 /*
203   run the "recovered" eventscript on all nodes
204  */
205 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
206 {
207         TALLOC_CTX *tmp_ctx;
208         uint32_t *nodes;
209
210         tmp_ctx = talloc_new(ctdb);
211         CTDB_NO_MEMORY(ctdb, tmp_ctx);
212
213         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
214         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
215                                         nodes,
216                                         CONTROL_TIMEOUT(), false, tdb_null,
217                                         NULL, NULL,
218                                         NULL) != 0) {
219                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
220
221                 talloc_free(tmp_ctx);
222                 return -1;
223         }
224
225         talloc_free(tmp_ctx);
226         return 0;
227 }
228
229 /*
230   remember the trouble maker
231  */
232 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
233 {
234         struct ctdb_context *ctdb = rec->ctdb;
235
236         if (rec->last_culprit != culprit ||
237             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
238                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
239                 /* either a new node is the culprit, or we've decided to forgive them */
240                 rec->last_culprit = culprit;
241                 rec->first_recover_time = timeval_current();
242                 rec->culprit_counter = 0;
243         }
244         rec->culprit_counter++;
245 }
246
247
248 /* this callback is called for every node that failed to execute the
249    start recovery event
250 */
251 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
256
257         ctdb_set_culprit(rec, node_pnn);
258 }
259
260 /*
261   run the "startrecovery" eventscript on all nodes
262  */
263 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
264 {
265         TALLOC_CTX *tmp_ctx;
266         uint32_t *nodes;
267         struct ctdb_context *ctdb = rec->ctdb;
268
269         tmp_ctx = talloc_new(ctdb);
270         CTDB_NO_MEMORY(ctdb, tmp_ctx);
271
272         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
273         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
274                                         nodes,
275                                         CONTROL_TIMEOUT(), false, tdb_null,
276                                         NULL,
277                                         startrecovery_fail_callback,
278                                         rec) != 0) {
279                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
280                 talloc_free(tmp_ctx);
281                 return -1;
282         }
283
284         talloc_free(tmp_ctx);
285         return 0;
286 }
287
288 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
289 {
290         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
291                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
292                 return;
293         }
294         if (node_pnn < ctdb->num_nodes) {
295                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
296         }
297 }
298
299 /*
300   update the node capabilities for all connected nodes
301  */
302 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
303 {
304         uint32_t *nodes;
305         TALLOC_CTX *tmp_ctx;
306
307         tmp_ctx = talloc_new(ctdb);
308         CTDB_NO_MEMORY(ctdb, tmp_ctx);
309
310         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
311         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
312                                         nodes, CONTROL_TIMEOUT(),
313                                         false, tdb_null,
314                                         async_getcap_callback, NULL,
315                                         NULL) != 0) {
316                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
317                 talloc_free(tmp_ctx);
318                 return -1;
319         }
320
321         talloc_free(tmp_ctx);
322         return 0;
323 }
324
325 /*
326   change recovery mode on all nodes
327  */
328 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
329 {
330         TDB_DATA data;
331         uint32_t *nodes;
332         TALLOC_CTX *tmp_ctx;
333
334         tmp_ctx = talloc_new(ctdb);
335         CTDB_NO_MEMORY(ctdb, tmp_ctx);
336
337         /* freeze all nodes */
338         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
340                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
341                                                 nodes, CONTROL_TIMEOUT(),
342                                                 false, tdb_null,
343                                                 NULL, NULL,
344                                                 NULL) != 0) {
345                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
346                         talloc_free(tmp_ctx);
347                         return -1;
348                 }
349         }
350
351
352         data.dsize = sizeof(uint32_t);
353         data.dptr = (unsigned char *)&rec_mode;
354
355         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
356                                         nodes, CONTROL_TIMEOUT(),
357                                         false, data,
358                                         NULL, NULL,
359                                         NULL) != 0) {
360                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
361                 talloc_free(tmp_ctx);
362                 return -1;
363         }
364
365         talloc_free(tmp_ctx);
366         return 0;
367 }
368
369 /*
370   change recovery master on all node
371  */
372 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
373 {
374         TDB_DATA data;
375         TALLOC_CTX *tmp_ctx;
376         uint32_t *nodes;
377
378         tmp_ctx = talloc_new(ctdb);
379         CTDB_NO_MEMORY(ctdb, tmp_ctx);
380
381         data.dsize = sizeof(uint32_t);
382         data.dptr = (unsigned char *)&pnn;
383
384         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
386                                         nodes,
387                                         CONTROL_TIMEOUT(), false, data,
388                                         NULL, NULL,
389                                         NULL) != 0) {
390                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
391                 talloc_free(tmp_ctx);
392                 return -1;
393         }
394
395         talloc_free(tmp_ctx);
396         return 0;
397 }
398
399
400 /*
401   ensure all other nodes have attached to any databases that we have
402  */
403 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
404                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
405 {
406         int i, j, db, ret;
407         struct ctdb_dbid_map *remote_dbmap;
408
409         /* verify that all other nodes have all our databases */
410         for (j=0; j<nodemap->num; j++) {
411                 /* we dont need to ourself ourselves */
412                 if (nodemap->nodes[j].pnn == pnn) {
413                         continue;
414                 }
415                 /* dont check nodes that are unavailable */
416                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
417                         continue;
418                 }
419
420                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
421                                          mem_ctx, &remote_dbmap);
422                 if (ret != 0) {
423                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
424                         return -1;
425                 }
426
427                 /* step through all local databases */
428                 for (db=0; db<dbmap->num;db++) {
429                         const char *name;
430
431
432                         for (i=0;i<remote_dbmap->num;i++) {
433                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
434                                         break;
435                                 }
436                         }
437                         /* the remote node already have this database */
438                         if (i!=remote_dbmap->num) {
439                                 continue;
440                         }
441                         /* ok so we need to create this database */
442                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
443                                             mem_ctx, &name);
444                         if (ret != 0) {
445                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
446                                 return -1;
447                         }
448                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
449                                            mem_ctx, name, dbmap->dbs[db].persistent);
450                         if (ret != 0) {
451                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
452                                 return -1;
453                         }
454                 }
455         }
456
457         return 0;
458 }
459
460
461 /*
462   ensure we are attached to any databases that anyone else is attached to
463  */
464 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
465                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
466 {
467         int i, j, db, ret;
468         struct ctdb_dbid_map *remote_dbmap;
469
470         /* verify that we have all database any other node has */
471         for (j=0; j<nodemap->num; j++) {
472                 /* we dont need to ourself ourselves */
473                 if (nodemap->nodes[j].pnn == pnn) {
474                         continue;
475                 }
476                 /* dont check nodes that are unavailable */
477                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
478                         continue;
479                 }
480
481                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
482                                          mem_ctx, &remote_dbmap);
483                 if (ret != 0) {
484                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
485                         return -1;
486                 }
487
488                 /* step through all databases on the remote node */
489                 for (db=0; db<remote_dbmap->num;db++) {
490                         const char *name;
491
492                         for (i=0;i<(*dbmap)->num;i++) {
493                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
494                                         break;
495                                 }
496                         }
497                         /* we already have this db locally */
498                         if (i!=(*dbmap)->num) {
499                                 continue;
500                         }
501                         /* ok so we need to create this database and
502                            rebuild dbmap
503                          */
504                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
505                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
506                         if (ret != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
508                                           nodemap->nodes[j].pnn));
509                                 return -1;
510                         }
511                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
512                                            remote_dbmap->dbs[db].persistent);
513                         if (ret != 0) {
514                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
515                                 return -1;
516                         }
517                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
518                         if (ret != 0) {
519                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
520                                 return -1;
521                         }
522                 }
523         }
524
525         return 0;
526 }
527
528
529 /*
530   pull the remote database contents from one node into the recdb
531  */
532 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
533                                     struct tdb_wrap *recdb, uint32_t dbid)
534 {
535         int ret;
536         TDB_DATA outdata;
537         struct ctdb_marshall_buffer *reply;
538         struct ctdb_rec_data *rec;
539         int i;
540         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
541
542         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
543                                CONTROL_TIMEOUT(), &outdata);
544         if (ret != 0) {
545                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
546                 talloc_free(tmp_ctx);
547                 return -1;
548         }
549
550         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
551
552         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
553                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
554                 talloc_free(tmp_ctx);
555                 return -1;
556         }
557         
558         rec = (struct ctdb_rec_data *)&reply->data[0];
559         
560         for (i=0;
561              i<reply->count;
562              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
563                 TDB_DATA key, data;
564                 struct ctdb_ltdb_header *hdr;
565                 TDB_DATA existing;
566                 
567                 key.dptr = &rec->data[0];
568                 key.dsize = rec->keylen;
569                 data.dptr = &rec->data[key.dsize];
570                 data.dsize = rec->datalen;
571                 
572                 hdr = (struct ctdb_ltdb_header *)data.dptr;
573
574                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
575                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
576                         talloc_free(tmp_ctx);
577                         return -1;
578                 }
579
580                 /* fetch the existing record, if any */
581                 existing = tdb_fetch(recdb->tdb, key);
582                 
583                 if (existing.dptr != NULL) {
584                         struct ctdb_ltdb_header header;
585                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
586                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
587                                          (unsigned)existing.dsize, srcnode));
588                                 free(existing.dptr);
589                                 talloc_free(tmp_ctx);
590                                 return -1;
591                         }
592                         header = *(struct ctdb_ltdb_header *)existing.dptr;
593                         free(existing.dptr);
594                         if (!(header.rsn < hdr->rsn ||
595                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
596                                 continue;
597                         }
598                 }
599                 
600                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
601                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
602                         talloc_free(tmp_ctx);
603                         return -1;                              
604                 }
605         }
606
607         talloc_free(tmp_ctx);
608
609         return 0;
610 }
611
612 /*
613   pull all the remote database contents into the recdb
614  */
615 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
616                                 struct tdb_wrap *recdb, uint32_t dbid)
617 {
618         int j;
619
620         /* pull all records from all other nodes across onto this node
621            (this merges based on rsn)
622         */
623         for (j=0; j<nodemap->num; j++) {
624                 /* dont merge from nodes that are unavailable */
625                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
626                         continue;
627                 }
628                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
629                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
630                                  nodemap->nodes[j].pnn));
631                         return -1;
632                 }
633         }
634         
635         return 0;
636 }
637
638
639 /*
640   update flags on all active nodes
641  */
642 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
643 {
644         int i;
645         for (i=0;i<nodemap->num;i++) {
646                 int ret;
647
648                 ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, nodemap->nodes[i].flags, ~nodemap->nodes[i].flags);
649                 if (ret != 0) {
650                         DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
651                         return -1;
652                 }
653         }
654         return 0;
655 }
656
657 static int update_our_flags_on_all_nodes(struct ctdb_context *ctdb, uint32_t pnn, struct ctdb_node_map *nodemap)
658 {
659         int ret;
660
661         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[pnn].pnn, nodemap->nodes[pnn].flags, ~nodemap->nodes[pnn].flags);
662         if (ret != 0) {
663                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
664                 return -1;
665         }
666
667         return 0;
668 }
669
670 /*
671   ensure all nodes have the same vnnmap we do
672  */
673 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
674                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
675 {
676         int j, ret;
677
678         /* push the new vnn map out to all the nodes */
679         for (j=0; j<nodemap->num; j++) {
680                 /* dont push to nodes that are unavailable */
681                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
682                         continue;
683                 }
684
685                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
686                 if (ret != 0) {
687                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
688                         return -1;
689                 }
690         }
691
692         return 0;
693 }
694
695
696 /*
697   handler for when the admin bans a node
698 */
699 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
700                         TDB_DATA data, void *private_data)
701 {
702         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
703         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
704         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
705
706         if (data.dsize != sizeof(*b)) {
707                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
708                 talloc_free(mem_ctx);
709                 return;
710         }
711
712         if (b->pnn != ctdb->pnn) {
713                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
714                 return;
715         }
716
717         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
718                  b->pnn, b->ban_time));
719
720         ctdb_ban_node(rec, b->pnn, b->ban_time);
721         talloc_free(mem_ctx);
722 }
723
724 /*
725   handler for when the admin unbans a node
726 */
727 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
728                           TDB_DATA data, void *private_data)
729 {
730         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
731         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
732         uint32_t pnn;
733
734         if (data.dsize != sizeof(uint32_t)) {
735                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
736                 talloc_free(mem_ctx);
737                 return;
738         }
739         pnn = *(uint32_t *)data.dptr;
740
741         if (pnn != ctdb->pnn) {
742                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
743                 return;
744         }
745
746         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
747         ctdb_unban_node(rec, pnn);
748         talloc_free(mem_ctx);
749 }
750
751
752 struct vacuum_info {
753         struct vacuum_info *next, *prev;
754         struct ctdb_recoverd *rec;
755         uint32_t srcnode;
756         struct ctdb_db_context *ctdb_db;
757         struct ctdb_marshall_buffer *recs;
758         struct ctdb_rec_data *r;
759 };
760
761 static void vacuum_fetch_next(struct vacuum_info *v);
762
763 /*
764   called when a vacuum fetch has completed - just free it and do the next one
765  */
766 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
767 {
768         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
769         talloc_free(state);
770         vacuum_fetch_next(v);
771 }
772
773
774 /*
775   process the next element from the vacuum list
776 */
777 static void vacuum_fetch_next(struct vacuum_info *v)
778 {
779         struct ctdb_call call;
780         struct ctdb_rec_data *r;
781
782         while (v->recs->count) {
783                 struct ctdb_client_call_state *state;
784                 TDB_DATA data;
785                 struct ctdb_ltdb_header *hdr;
786
787                 ZERO_STRUCT(call);
788                 call.call_id = CTDB_NULL_FUNC;
789                 call.flags = CTDB_IMMEDIATE_MIGRATION;
790
791                 r = v->r;
792                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
793                 v->recs->count--;
794
795                 call.key.dptr = &r->data[0];
796                 call.key.dsize = r->keylen;
797
798                 /* ensure we don't block this daemon - just skip a record if we can't get
799                    the chainlock */
800                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
801                         continue;
802                 }
803
804                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
805                 if (data.dptr == NULL) {
806                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
807                         continue;
808                 }
809
810                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
811                         free(data.dptr);
812                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
813                         continue;
814                 }
815                 
816                 hdr = (struct ctdb_ltdb_header *)data.dptr;
817                 if (hdr->dmaster == v->rec->ctdb->pnn) {
818                         /* its already local */
819                         free(data.dptr);
820                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
821                         continue;
822                 }
823
824                 free(data.dptr);
825
826                 state = ctdb_call_send(v->ctdb_db, &call);
827                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
828                 if (state == NULL) {
829                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
830                         talloc_free(v);
831                         return;
832                 }
833                 state->async.fn = vacuum_fetch_callback;
834                 state->async.private_data = v;
835                 return;
836         }
837
838         talloc_free(v);
839 }
840
841
842 /*
843   destroy a vacuum info structure
844  */
845 static int vacuum_info_destructor(struct vacuum_info *v)
846 {
847         DLIST_REMOVE(v->rec->vacuum_info, v);
848         return 0;
849 }
850
851
852 /*
853   handler for vacuum fetch
854 */
855 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
856                                  TDB_DATA data, void *private_data)
857 {
858         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
859         struct ctdb_marshall_buffer *recs;
860         int ret, i;
861         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
862         const char *name;
863         struct ctdb_dbid_map *dbmap=NULL;
864         bool persistent = false;
865         struct ctdb_db_context *ctdb_db;
866         struct ctdb_rec_data *r;
867         uint32_t srcnode;
868         struct vacuum_info *v;
869
870         recs = (struct ctdb_marshall_buffer *)data.dptr;
871         r = (struct ctdb_rec_data *)&recs->data[0];
872
873         if (recs->count == 0) {
874                 talloc_free(tmp_ctx);
875                 return;
876         }
877
878         srcnode = r->reqid;
879
880         for (v=rec->vacuum_info;v;v=v->next) {
881                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
882                         /* we're already working on records from this node */
883                         talloc_free(tmp_ctx);
884                         return;
885                 }
886         }
887
888         /* work out if the database is persistent */
889         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
890         if (ret != 0) {
891                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
892                 talloc_free(tmp_ctx);
893                 return;
894         }
895
896         for (i=0;i<dbmap->num;i++) {
897                 if (dbmap->dbs[i].dbid == recs->db_id) {
898                         persistent = dbmap->dbs[i].persistent;
899                         break;
900                 }
901         }
902         if (i == dbmap->num) {
903                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
904                 talloc_free(tmp_ctx);
905                 return;         
906         }
907
908         /* find the name of this database */
909         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
910                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
911                 talloc_free(tmp_ctx);
912                 return;
913         }
914
915         /* attach to it */
916         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
917         if (ctdb_db == NULL) {
918                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
919                 talloc_free(tmp_ctx);
920                 return;
921         }
922
923         v = talloc_zero(rec, struct vacuum_info);
924         if (v == NULL) {
925                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
926                 talloc_free(tmp_ctx);
927                 return;
928         }
929
930         v->rec = rec;
931         v->srcnode = srcnode;
932         v->ctdb_db = ctdb_db;
933         v->recs = talloc_memdup(v, recs, data.dsize);
934         if (v->recs == NULL) {
935                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
936                 talloc_free(v);
937                 talloc_free(tmp_ctx);
938                 return;         
939         }
940         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
941
942         DLIST_ADD(rec->vacuum_info, v);
943
944         talloc_set_destructor(v, vacuum_info_destructor);
945
946         vacuum_fetch_next(v);
947         talloc_free(tmp_ctx);
948 }
949
950
951 /*
952   called when ctdb_wait_timeout should finish
953  */
954 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
955                               struct timeval yt, void *p)
956 {
957         uint32_t *timed_out = (uint32_t *)p;
958         (*timed_out) = 1;
959 }
960
961 /*
962   wait for a given number of seconds
963  */
964 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
965 {
966         uint32_t timed_out = 0;
967         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
968         while (!timed_out) {
969                 event_loop_once(ctdb->ev);
970         }
971 }
972
973 /*
974   called when an election times out (ends)
975  */
976 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
977                                   struct timeval t, void *p)
978 {
979         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
980         rec->election_timeout = NULL;
981 }
982
983
984 /*
985   wait for an election to finish. It finished election_timeout seconds after
986   the last election packet is received
987  */
988 static void ctdb_wait_election(struct ctdb_recoverd *rec)
989 {
990         struct ctdb_context *ctdb = rec->ctdb;
991         while (rec->election_timeout) {
992                 event_loop_once(ctdb->ev);
993         }
994 }
995
996 /*
997   Update our local flags from all remote connected nodes. 
998   This is only run when we are or we belive we are the recovery master
999  */
1000 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1001 {
1002         int j;
1003         struct ctdb_context *ctdb = rec->ctdb;
1004         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1005
1006         /* get the nodemap for all active remote nodes and verify
1007            they are the same as for this node
1008          */
1009         for (j=0; j<nodemap->num; j++) {
1010                 struct ctdb_node_map *remote_nodemap=NULL;
1011                 int ret;
1012
1013                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1014                         continue;
1015                 }
1016                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1017                         continue;
1018                 }
1019
1020                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1021                                            mem_ctx, &remote_nodemap);
1022                 if (ret != 0) {
1023                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1024                                   nodemap->nodes[j].pnn));
1025                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1026                         talloc_free(mem_ctx);
1027                         return MONITOR_FAILED;
1028                 }
1029                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1030                         int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
1031
1032                         if (ban_changed) {
1033                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1034                                 nodemap->nodes[j].pnn,
1035                                 remote_nodemap->nodes[j].flags,
1036                                 nodemap->nodes[j].flags));
1037                         }
1038
1039                         /* We should tell our daemon about this so it
1040                            updates its flags or else we will log the same 
1041                            message again in the next iteration of recovery.
1042                            Since we are the recovery master we can just as
1043                            well update the flags on all nodes.
1044                         */
1045                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1046                         if (ret != 0) {
1047                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1048                                 return -1;
1049                         }
1050
1051                         /* Update our local copy of the flags in the recovery
1052                            daemon.
1053                         */
1054                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1055                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1056                                  nodemap->nodes[j].flags));
1057                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1058
1059                         /* If the BANNED flag has changed for the node
1060                            this is a good reason to do a new election.
1061                          */
1062                         if (ban_changed) {
1063                                 talloc_free(mem_ctx);
1064                                 return MONITOR_ELECTION_NEEDED;
1065                         }
1066
1067                 }
1068                 talloc_free(remote_nodemap);
1069         }
1070         talloc_free(mem_ctx);
1071         return MONITOR_OK;
1072 }
1073
1074
1075 /* Create a new random generation ip. 
1076    The generation id can not be the INVALID_GENERATION id
1077 */
1078 static uint32_t new_generation(void)
1079 {
1080         uint32_t generation;
1081
1082         while (1) {
1083                 generation = random();
1084
1085                 if (generation != INVALID_GENERATION) {
1086                         break;
1087                 }
1088         }
1089
1090         return generation;
1091 }
1092
1093
1094 /*
1095   create a temporary working database
1096  */
1097 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1098 {
1099         char *name;
1100         struct tdb_wrap *recdb;
1101         unsigned tdb_flags;
1102
1103         /* open up the temporary recovery database */
1104         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1105         if (name == NULL) {
1106                 return NULL;
1107         }
1108         unlink(name);
1109
1110         tdb_flags = TDB_NOLOCK;
1111         if (!ctdb->do_setsched) {
1112                 tdb_flags |= TDB_NOMMAP;
1113         }
1114
1115         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1116                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1117         if (recdb == NULL) {
1118                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1119         }
1120
1121         talloc_free(name);
1122
1123         return recdb;
1124 }
1125
1126
1127 /* 
1128    a traverse function for pulling all relevent records from recdb
1129  */
1130 struct recdb_data {
1131         struct ctdb_context *ctdb;
1132         struct ctdb_marshall_buffer *recdata;
1133         uint32_t len;
1134         bool failed;
1135 };
1136
1137 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1138 {
1139         struct recdb_data *params = (struct recdb_data *)p;
1140         struct ctdb_rec_data *rec;
1141         struct ctdb_ltdb_header *hdr;
1142
1143         /* skip empty records */
1144         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1145                 return 0;
1146         }
1147
1148         /* update the dmaster field to point to us */
1149         hdr = (struct ctdb_ltdb_header *)data.dptr;
1150         hdr->dmaster = params->ctdb->pnn;
1151
1152         /* add the record to the blob ready to send to the nodes */
1153         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1154         if (rec == NULL) {
1155                 params->failed = true;
1156                 return -1;
1157         }
1158         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1159         if (params->recdata == NULL) {
1160                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1161                          rec->length + params->len, params->recdata->count));
1162                 params->failed = true;
1163                 return -1;
1164         }
1165         params->recdata->count++;
1166         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1167         params->len += rec->length;
1168         talloc_free(rec);
1169
1170         return 0;
1171 }
1172
1173 /*
1174   push the recdb database out to all nodes
1175  */
1176 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1177                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1178 {
1179         struct recdb_data params;
1180         struct ctdb_marshall_buffer *recdata;
1181         TDB_DATA outdata;
1182         TALLOC_CTX *tmp_ctx;
1183         uint32_t *nodes;
1184
1185         tmp_ctx = talloc_new(ctdb);
1186         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1187
1188         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1189         CTDB_NO_MEMORY(ctdb, recdata);
1190
1191         recdata->db_id = dbid;
1192
1193         params.ctdb = ctdb;
1194         params.recdata = recdata;
1195         params.len = offsetof(struct ctdb_marshall_buffer, data);
1196         params.failed = false;
1197
1198         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1199                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1200                 talloc_free(params.recdata);
1201                 talloc_free(tmp_ctx);
1202                 return -1;
1203         }
1204
1205         if (params.failed) {
1206                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1207                 talloc_free(params.recdata);
1208                 talloc_free(tmp_ctx);
1209                 return -1;              
1210         }
1211
1212         recdata = params.recdata;
1213
1214         outdata.dptr = (void *)recdata;
1215         outdata.dsize = params.len;
1216
1217         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1218         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1219                                         nodes,
1220                                         CONTROL_TIMEOUT(), false, outdata,
1221                                         NULL, NULL,
1222                                         NULL) != 0) {
1223                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1224                 talloc_free(recdata);
1225                 talloc_free(tmp_ctx);
1226                 return -1;
1227         }
1228
1229         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1230                   dbid, recdata->count));
1231
1232         talloc_free(recdata);
1233         talloc_free(tmp_ctx);
1234
1235         return 0;
1236 }
1237
1238
1239 /*
1240   go through a full recovery on one database 
1241  */
1242 static int recover_database(struct ctdb_recoverd *rec, 
1243                             TALLOC_CTX *mem_ctx,
1244                             uint32_t dbid,
1245                             uint32_t pnn, 
1246                             struct ctdb_node_map *nodemap,
1247                             uint32_t transaction_id)
1248 {
1249         struct tdb_wrap *recdb;
1250         int ret;
1251         struct ctdb_context *ctdb = rec->ctdb;
1252         TDB_DATA data;
1253         struct ctdb_control_wipe_database w;
1254         uint32_t *nodes;
1255
1256         recdb = create_recdb(ctdb, mem_ctx);
1257         if (recdb == NULL) {
1258                 return -1;
1259         }
1260
1261         /* pull all remote databases onto the recdb */
1262         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1263         if (ret != 0) {
1264                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1265                 return -1;
1266         }
1267
1268         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1269
1270         /* wipe all the remote databases. This is safe as we are in a transaction */
1271         w.db_id = dbid;
1272         w.transaction_id = transaction_id;
1273
1274         data.dptr = (void *)&w;
1275         data.dsize = sizeof(w);
1276
1277         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1278         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1279                                         nodes,
1280                                         CONTROL_TIMEOUT(), false, data,
1281                                         NULL, NULL,
1282                                         NULL) != 0) {
1283                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1284                 talloc_free(recdb);
1285                 return -1;
1286         }
1287         
1288         /* push out the correct database. This sets the dmaster and skips 
1289            the empty records */
1290         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1291         if (ret != 0) {
1292                 talloc_free(recdb);
1293                 return -1;
1294         }
1295
1296         /* all done with this database */
1297         talloc_free(recdb);
1298
1299         return 0;
1300 }
1301
1302 /*
1303   reload the nodes file 
1304 */
1305 static void reload_nodes_file(struct ctdb_context *ctdb)
1306 {
1307         ctdb->nodes = NULL;
1308         ctdb_load_nodes_file(ctdb);
1309 }
1310
1311         
1312 /*
1313   we are the recmaster, and recovery is needed - start a recovery run
1314  */
1315 static int do_recovery(struct ctdb_recoverd *rec, 
1316                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1317                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1318                        int32_t culprit)
1319 {
1320         struct ctdb_context *ctdb = rec->ctdb;
1321         int i, j, ret;
1322         uint32_t generation;
1323         struct ctdb_dbid_map *dbmap;
1324         TDB_DATA data;
1325         uint32_t *nodes;
1326
1327         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1328
1329         if (ctdb->num_nodes != nodemap->num) {
1330                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
1331                 reload_nodes_file(ctdb);
1332                 return -1;
1333         }
1334
1335         /* if recovery fails, force it again */
1336         rec->need_recovery = true;
1337
1338         if (culprit != -1) {
1339                 ctdb_set_culprit(rec, culprit);
1340         }
1341
1342         if (rec->culprit_counter > 2*nodemap->num) {
1343                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1344                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1345                          ctdb->tunable.recovery_ban_period));
1346                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1347         }
1348
1349         if (!ctdb_recovery_lock(ctdb, true)) {
1350                 ctdb_set_culprit(rec, pnn);
1351                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1352                 return -1;
1353         }
1354
1355         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1356
1357         /* get a list of all databases */
1358         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1359         if (ret != 0) {
1360                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1361                 return -1;
1362         }
1363
1364         /* we do the db creation before we set the recovery mode, so the freeze happens
1365            on all databases we will be dealing with. */
1366
1367         /* verify that we have all the databases any other node has */
1368         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1369         if (ret != 0) {
1370                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1371                 return -1;
1372         }
1373
1374         /* verify that all other nodes have all our databases */
1375         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1376         if (ret != 0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1378                 return -1;
1379         }
1380
1381         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1382
1383
1384         /* set recovery mode to active on all nodes */
1385         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1386         if (ret != 0) {
1387                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1388                 return -1;
1389         }
1390
1391         /* execute the "startrecovery" event script on all nodes */
1392         ret = run_startrecovery_eventscript(rec, nodemap);
1393         if (ret!=0) {
1394                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1395                 return -1;
1396         }
1397
1398         /* pick a new generation number */
1399         generation = new_generation();
1400
1401         /* change the vnnmap on this node to use the new generation 
1402            number but not on any other nodes.
1403            this guarantees that if we abort the recovery prematurely
1404            for some reason (a node stops responding?)
1405            that we can just return immediately and we will reenter
1406            recovery shortly again.
1407            I.e. we deliberately leave the cluster with an inconsistent
1408            generation id to allow us to abort recovery at any stage and
1409            just restart it from scratch.
1410          */
1411         vnnmap->generation = generation;
1412         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1413         if (ret != 0) {
1414                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1415                 return -1;
1416         }
1417
1418         data.dptr = (void *)&generation;
1419         data.dsize = sizeof(uint32_t);
1420
1421         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1422         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1423                                         nodes,
1424                                         CONTROL_TIMEOUT(), false, data,
1425                                         NULL, NULL,
1426                                         NULL) != 0) {
1427                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1428                 return -1;
1429         }
1430
1431         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1432
1433         for (i=0;i<dbmap->num;i++) {
1434                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1435                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1436                         return -1;
1437                 }
1438         }
1439
1440         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1441
1442         /* commit all the changes */
1443         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1444                                         nodes,
1445                                         CONTROL_TIMEOUT(), false, data,
1446                                         NULL, NULL,
1447                                         NULL) != 0) {
1448                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1449                 return -1;
1450         }
1451
1452         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1453         
1454
1455         /* update the capabilities for all nodes */
1456         ret = update_capabilities(ctdb, nodemap);
1457         if (ret!=0) {
1458                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1459                 return -1;
1460         }
1461
1462         /* build a new vnn map with all the currently active and
1463            unbanned nodes */
1464         generation = new_generation();
1465         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1466         CTDB_NO_MEMORY(ctdb, vnnmap);
1467         vnnmap->generation = generation;
1468         vnnmap->size = 0;
1469         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1470         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1471         for (i=j=0;i<nodemap->num;i++) {
1472                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1473                         continue;
1474                 }
1475                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1476                         /* this node can not be an lmaster */
1477                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1478                         continue;
1479                 }
1480
1481                 vnnmap->size++;
1482                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1483                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1484                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1485
1486         }
1487         if (vnnmap->size == 0) {
1488                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1489                 vnnmap->size++;
1490                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1491                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1492                 vnnmap->map[0] = pnn;
1493         }       
1494
1495         /* update to the new vnnmap on all nodes */
1496         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1497         if (ret != 0) {
1498                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1499                 return -1;
1500         }
1501
1502         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1503
1504         /* update recmaster to point to us for all nodes */
1505         ret = set_recovery_master(ctdb, nodemap, pnn);
1506         if (ret!=0) {
1507                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1508                 return -1;
1509         }
1510
1511         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1512
1513         /*
1514           update all nodes to have the same flags that we have
1515          */
1516         ret = update_flags_on_all_nodes(ctdb, nodemap);
1517         if (ret != 0) {
1518                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
1519                 return -1;
1520         }
1521         
1522         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1523
1524         /* disable recovery mode */
1525         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1526         if (ret != 0) {
1527                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1528                 return -1;
1529         }
1530
1531         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1532
1533         /*
1534           tell nodes to takeover their public IPs
1535          */
1536         rec->need_takeover_run = false;
1537         ret = ctdb_takeover_run(ctdb, nodemap);
1538         if (ret != 0) {
1539                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1540                 return -1;
1541         }
1542         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1543
1544         /* execute the "recovered" event script on all nodes */
1545         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1546         if (ret!=0) {
1547                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1548                 return -1;
1549         }
1550
1551         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1552
1553         /* send a message to all clients telling them that the cluster 
1554            has been reconfigured */
1555         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1556
1557         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1558
1559         rec->need_recovery = false;
1560
1561         /* We just finished a recovery successfully. 
1562            We now wait for rerecovery_timeout before we allow 
1563            another recovery to take place.
1564         */
1565         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1566         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1567         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1568
1569         return 0;
1570 }
1571
1572
1573 /*
1574   elections are won by first checking the number of connected nodes, then
1575   the priority time, then the pnn
1576  */
1577 struct election_message {
1578         uint32_t num_connected;
1579         struct timeval priority_time;
1580         uint32_t pnn;
1581         uint32_t node_flags;
1582 };
1583
1584 /*
1585   form this nodes election data
1586  */
1587 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1588 {
1589         int ret, i;
1590         struct ctdb_node_map *nodemap;
1591         struct ctdb_context *ctdb = rec->ctdb;
1592
1593         ZERO_STRUCTP(em);
1594
1595         em->pnn = rec->ctdb->pnn;
1596         em->priority_time = rec->priority_time;
1597         em->node_flags = rec->node_flags;
1598
1599         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1600         if (ret != 0) {
1601                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1602                 return;
1603         }
1604
1605         for (i=0;i<nodemap->num;i++) {
1606                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1607                         em->num_connected++;
1608                 }
1609         }
1610
1611         /* we shouldnt try to win this election if we cant be a recmaster */
1612         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1613                 em->num_connected = 0;
1614                 em->priority_time = timeval_current();
1615         }
1616
1617         talloc_free(nodemap);
1618 }
1619
1620 /*
1621   see if the given election data wins
1622  */
1623 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1624 {
1625         struct election_message myem;
1626         int cmp = 0;
1627
1628         ctdb_election_data(rec, &myem);
1629
1630         /* we cant win if we dont have the recmaster capability */
1631         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1632                 return false;
1633         }
1634
1635         /* we cant win if we are banned */
1636         if (rec->node_flags & NODE_FLAGS_BANNED) {
1637                 return false;
1638         }       
1639
1640         /* we will automatically win if the other node is banned */
1641         if (em->node_flags & NODE_FLAGS_BANNED) {
1642                 return true;
1643         }
1644
1645         /* try to use the most connected node */
1646         if (cmp == 0) {
1647                 cmp = (int)myem.num_connected - (int)em->num_connected;
1648         }
1649
1650         /* then the longest running node */
1651         if (cmp == 0) {
1652                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1653         }
1654
1655         if (cmp == 0) {
1656                 cmp = (int)myem.pnn - (int)em->pnn;
1657         }
1658
1659         return cmp > 0;
1660 }
1661
1662 /*
1663   send out an election request
1664  */
1665 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1666 {
1667         int ret;
1668         TDB_DATA election_data;
1669         struct election_message emsg;
1670         uint64_t srvid;
1671         struct ctdb_context *ctdb = rec->ctdb;
1672
1673         srvid = CTDB_SRVID_RECOVERY;
1674
1675         ctdb_election_data(rec, &emsg);
1676
1677         election_data.dsize = sizeof(struct election_message);
1678         election_data.dptr  = (unsigned char *)&emsg;
1679
1680
1681         /* send an election message to all active nodes */
1682         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1683
1684
1685         /* A new node that is already frozen has entered the cluster.
1686            The existing nodes are not frozen and dont need to be frozen
1687            until the election has ended and we start the actual recovery
1688         */
1689         if (update_recmaster == true) {
1690                 /* first we assume we will win the election and set 
1691                    recoverymaster to be ourself on the current node
1692                  */
1693                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1694                 if (ret != 0) {
1695                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1696                         return -1;
1697                 }
1698         }
1699
1700
1701         return 0;
1702 }
1703
1704 /*
1705   this function will unban all nodes in the cluster
1706 */
1707 static void unban_all_nodes(struct ctdb_context *ctdb)
1708 {
1709         int ret, i;
1710         struct ctdb_node_map *nodemap;
1711         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1712         
1713         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1714         if (ret != 0) {
1715                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1716                 return;
1717         }
1718
1719         for (i=0;i<nodemap->num;i++) {
1720                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1721                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1722                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1723                 }
1724         }
1725
1726         talloc_free(tmp_ctx);
1727 }
1728
1729
1730 /*
1731   we think we are winning the election - send a broadcast election request
1732  */
1733 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1734 {
1735         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1736         int ret;
1737
1738         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1739         if (ret != 0) {
1740                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1741         }
1742
1743         talloc_free(rec->send_election_te);
1744         rec->send_election_te = NULL;
1745 }
1746
1747 /*
1748   handler for memory dumps
1749 */
1750 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1751                              TDB_DATA data, void *private_data)
1752 {
1753         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1754         TDB_DATA *dump;
1755         int ret;
1756         struct rd_memdump_reply *rd;
1757
1758         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1759                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1760                 talloc_free(tmp_ctx);
1761                 return;
1762         }
1763         rd = (struct rd_memdump_reply *)data.dptr;
1764
1765         dump = talloc_zero(tmp_ctx, TDB_DATA);
1766         if (dump == NULL) {
1767                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1768                 talloc_free(tmp_ctx);
1769                 return;
1770         }
1771         ret = ctdb_dump_memory(ctdb, dump);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1774                 talloc_free(tmp_ctx);
1775                 return;
1776         }
1777
1778 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1779
1780         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1781         if (ret != 0) {
1782                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1783                 talloc_free(tmp_ctx);
1784                 return;
1785         }
1786
1787         talloc_free(tmp_ctx);
1788 }
1789
1790 /*
1791   handler for recovery master elections
1792 */
1793 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1794                              TDB_DATA data, void *private_data)
1795 {
1796         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1797         int ret;
1798         struct election_message *em = (struct election_message *)data.dptr;
1799         TALLOC_CTX *mem_ctx;
1800
1801         /* we got an election packet - update the timeout for the election */
1802         talloc_free(rec->election_timeout);
1803         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1804                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1805                                                 ctdb_election_timeout, rec);
1806
1807         mem_ctx = talloc_new(ctdb);
1808
1809         /* someone called an election. check their election data
1810            and if we disagree and we would rather be the elected node, 
1811            send a new election message to all other nodes
1812          */
1813         if (ctdb_election_win(rec, em)) {
1814                 if (!rec->send_election_te) {
1815                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1816                                                                 timeval_current_ofs(0, 500000),
1817                                                                 election_send_request, rec);
1818                 }
1819                 talloc_free(mem_ctx);
1820                 /*unban_all_nodes(ctdb);*/
1821                 return;
1822         }
1823         
1824         /* we didn't win */
1825         talloc_free(rec->send_election_te);
1826         rec->send_election_te = NULL;
1827
1828         /* release the recmaster lock */
1829         if (em->pnn != ctdb->pnn &&
1830             ctdb->recovery_lock_fd != -1) {
1831                 close(ctdb->recovery_lock_fd);
1832                 ctdb->recovery_lock_fd = -1;
1833                 unban_all_nodes(ctdb);
1834         }
1835
1836         /* ok, let that guy become recmaster then */
1837         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1838         if (ret != 0) {
1839                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1840                 talloc_free(mem_ctx);
1841                 return;
1842         }
1843
1844         /* release any bans */
1845         rec->last_culprit = (uint32_t)-1;
1846         talloc_free(rec->banned_nodes);
1847         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1848         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1849
1850         talloc_free(mem_ctx);
1851         return;
1852 }
1853
1854
1855 /*
1856   force the start of the election process
1857  */
1858 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1859                            struct ctdb_node_map *nodemap)
1860 {
1861         int ret;
1862         struct ctdb_context *ctdb = rec->ctdb;
1863
1864         /* set all nodes to recovery mode to stop all internode traffic */
1865         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1868                 return;
1869         }
1870
1871         talloc_free(rec->election_timeout);
1872         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1873                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1874                                                 ctdb_election_timeout, rec);
1875
1876         ret = send_election_request(rec, pnn, true);
1877         if (ret!=0) {
1878                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1879                 return;
1880         }
1881
1882         /* wait for a few seconds to collect all responses */
1883         ctdb_wait_election(rec);
1884 }
1885
1886
1887
1888 /*
1889   handler for when a node changes its flags
1890 */
1891 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1892                             TDB_DATA data, void *private_data)
1893 {
1894         int ret;
1895         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1896         struct ctdb_node_map *nodemap=NULL;
1897         TALLOC_CTX *tmp_ctx;
1898         uint32_t changed_flags;
1899         int i;
1900         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1901
1902         if (data.dsize != sizeof(*c)) {
1903                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1904                 return;
1905         }
1906
1907         tmp_ctx = talloc_new(ctdb);
1908         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1909
1910         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1911         if (ret != 0) {
1912                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1913                 talloc_free(tmp_ctx);
1914                 return;         
1915         }
1916
1917
1918         for (i=0;i<nodemap->num;i++) {
1919                 if (nodemap->nodes[i].pnn == c->pnn) break;
1920         }
1921
1922         if (i == nodemap->num) {
1923                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1924                 talloc_free(tmp_ctx);
1925                 return;
1926         }
1927
1928         changed_flags = c->old_flags ^ c->new_flags;
1929
1930         if (nodemap->nodes[i].flags != c->new_flags) {
1931                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1932         }
1933
1934         nodemap->nodes[i].flags = c->new_flags;
1935
1936         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1937                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1938
1939         if (ret == 0) {
1940                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1941                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1942         }
1943         
1944         if (ret == 0 &&
1945             ctdb->recovery_master == ctdb->pnn &&
1946             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1947                 /* Only do the takeover run if the perm disabled or unhealthy
1948                    flags changed since these will cause an ip failover but not
1949                    a recovery.
1950                    If the node became disconnected or banned this will also
1951                    lead to an ip address failover but that is handled 
1952                    during recovery
1953                 */
1954                 if (changed_flags & NODE_FLAGS_DISABLED) {
1955                         rec->need_takeover_run = true;
1956                 }
1957         }
1958
1959         talloc_free(tmp_ctx);
1960 }
1961
1962 /*
1963   handler for when we need to push out flag changes ot all other nodes
1964 */
1965 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1966                             TDB_DATA data, void *private_data)
1967 {
1968         int ret;
1969         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1970
1971         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
1972         if (ret != 0) {
1973                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1974         }
1975 }
1976
1977
1978 struct verify_recmode_normal_data {
1979         uint32_t count;
1980         enum monitor_result status;
1981 };
1982
1983 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1984 {
1985         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1986
1987
1988         /* one more node has responded with recmode data*/
1989         rmdata->count--;
1990
1991         /* if we failed to get the recmode, then return an error and let
1992            the main loop try again.
1993         */
1994         if (state->state != CTDB_CONTROL_DONE) {
1995                 if (rmdata->status == MONITOR_OK) {
1996                         rmdata->status = MONITOR_FAILED;
1997                 }
1998                 return;
1999         }
2000
2001         /* if we got a response, then the recmode will be stored in the
2002            status field
2003         */
2004         if (state->status != CTDB_RECOVERY_NORMAL) {
2005                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2006                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2007         }
2008
2009         return;
2010 }
2011
2012
2013 /* verify that all nodes are in normal recovery mode */
2014 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2015 {
2016         struct verify_recmode_normal_data *rmdata;
2017         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2018         struct ctdb_client_control_state *state;
2019         enum monitor_result status;
2020         int j;
2021         
2022         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2023         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2024         rmdata->count  = 0;
2025         rmdata->status = MONITOR_OK;
2026
2027         /* loop over all active nodes and send an async getrecmode call to 
2028            them*/
2029         for (j=0; j<nodemap->num; j++) {
2030                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2031                         continue;
2032                 }
2033                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2034                                         CONTROL_TIMEOUT(), 
2035                                         nodemap->nodes[j].pnn);
2036                 if (state == NULL) {
2037                         /* we failed to send the control, treat this as 
2038                            an error and try again next iteration
2039                         */                      
2040                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2041                         talloc_free(mem_ctx);
2042                         return MONITOR_FAILED;
2043                 }
2044
2045                 /* set up the callback functions */
2046                 state->async.fn = verify_recmode_normal_callback;
2047                 state->async.private_data = rmdata;
2048
2049                 /* one more control to wait for to complete */
2050                 rmdata->count++;
2051         }
2052
2053
2054         /* now wait for up to the maximum number of seconds allowed
2055            or until all nodes we expect a response from has replied
2056         */
2057         while (rmdata->count > 0) {
2058                 event_loop_once(ctdb->ev);
2059         }
2060
2061         status = rmdata->status;
2062         talloc_free(mem_ctx);
2063         return status;
2064 }
2065
2066
2067 struct verify_recmaster_data {
2068         struct ctdb_recoverd *rec;
2069         uint32_t count;
2070         uint32_t pnn;
2071         enum monitor_result status;
2072 };
2073
2074 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2075 {
2076         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2077
2078
2079         /* one more node has responded with recmaster data*/
2080         rmdata->count--;
2081
2082         /* if we failed to get the recmaster, then return an error and let
2083            the main loop try again.
2084         */
2085         if (state->state != CTDB_CONTROL_DONE) {
2086                 if (rmdata->status == MONITOR_OK) {
2087                         rmdata->status = MONITOR_FAILED;
2088                 }
2089                 return;
2090         }
2091
2092         /* if we got a response, then the recmaster will be stored in the
2093            status field
2094         */
2095         if (state->status != rmdata->pnn) {
2096                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2097                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2098                 rmdata->status = MONITOR_ELECTION_NEEDED;
2099         }
2100
2101         return;
2102 }
2103
2104
2105 /* verify that all nodes agree that we are the recmaster */
2106 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2107 {
2108         struct ctdb_context *ctdb = rec->ctdb;
2109         struct verify_recmaster_data *rmdata;
2110         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2111         struct ctdb_client_control_state *state;
2112         enum monitor_result status;
2113         int j;
2114         
2115         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2116         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2117         rmdata->rec    = rec;
2118         rmdata->count  = 0;
2119         rmdata->pnn    = pnn;
2120         rmdata->status = MONITOR_OK;
2121
2122         /* loop over all active nodes and send an async getrecmaster call to 
2123            them*/
2124         for (j=0; j<nodemap->num; j++) {
2125                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2126                         continue;
2127                 }
2128                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2129                                         CONTROL_TIMEOUT(),
2130                                         nodemap->nodes[j].pnn);
2131                 if (state == NULL) {
2132                         /* we failed to send the control, treat this as 
2133                            an error and try again next iteration
2134                         */                      
2135                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2136                         talloc_free(mem_ctx);
2137                         return MONITOR_FAILED;
2138                 }
2139
2140                 /* set up the callback functions */
2141                 state->async.fn = verify_recmaster_callback;
2142                 state->async.private_data = rmdata;
2143
2144                 /* one more control to wait for to complete */
2145                 rmdata->count++;
2146         }
2147
2148
2149         /* now wait for up to the maximum number of seconds allowed
2150            or until all nodes we expect a response from has replied
2151         */
2152         while (rmdata->count > 0) {
2153                 event_loop_once(ctdb->ev);
2154         }
2155
2156         status = rmdata->status;
2157         talloc_free(mem_ctx);
2158         return status;
2159 }
2160
2161
2162 /* called to check that the allocation of public ip addresses is ok.
2163 */
2164 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2165 {
2166         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2167         struct ctdb_all_public_ips *ips = NULL;
2168         struct ctdb_uptime *uptime1 = NULL;
2169         struct ctdb_uptime *uptime2 = NULL;
2170         int ret, j;
2171
2172         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2173                                 CTDB_CURRENT_NODE, &uptime1);
2174         if (ret != 0) {
2175                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2176                 talloc_free(mem_ctx);
2177                 return -1;
2178         }
2179
2180         /* read the ip allocation from the local node */
2181         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2182         if (ret != 0) {
2183                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2184                 talloc_free(mem_ctx);
2185                 return -1;
2186         }
2187
2188         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2189                                 CTDB_CURRENT_NODE, &uptime2);
2190         if (ret != 0) {
2191                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2192                 talloc_free(mem_ctx);
2193                 return -1;
2194         }
2195
2196         /* skip the check if the startrecovery time has changed */
2197         if (timeval_compare(&uptime1->last_recovery_started,
2198                             &uptime2->last_recovery_started) != 0) {
2199                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2200                 talloc_free(mem_ctx);
2201                 return 0;
2202         }
2203
2204         /* skip the check if the endrecovery time has changed */
2205         if (timeval_compare(&uptime1->last_recovery_finished,
2206                             &uptime2->last_recovery_finished) != 0) {
2207                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2208                 talloc_free(mem_ctx);
2209                 return 0;
2210         }
2211
2212         /* skip the check if we have started but not finished recovery */
2213         if (timeval_compare(&uptime1->last_recovery_finished,
2214                             &uptime1->last_recovery_started) != 1) {
2215                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2216                 talloc_free(mem_ctx);
2217
2218                 return 0;
2219         }
2220
2221         /* verify that we have the ip addresses we should have
2222            and we dont have ones we shouldnt have.
2223            if we find an inconsistency we set recmode to
2224            active on the local node and wait for the recmaster
2225            to do a full blown recovery
2226         */
2227         for (j=0; j<ips->num; j++) {
2228                 if (ips->ips[j].pnn == pnn) {
2229                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2230                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2231                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2232                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2233                                 if (ret != 0) {
2234                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2235
2236                                         talloc_free(mem_ctx);
2237                                         return -1;
2238                                 }
2239                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2240                                 if (ret != 0) {
2241                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2242
2243                                         talloc_free(mem_ctx);
2244                                         return -1;
2245                                 }
2246                         }
2247                 } else {
2248                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2249                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2250                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2251
2252                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2253                                 if (ret != 0) {
2254                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2255
2256                                         talloc_free(mem_ctx);
2257                                         return -1;
2258                                 }
2259                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2260                                 if (ret != 0) {
2261                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2262
2263                                         talloc_free(mem_ctx);
2264                                         return -1;
2265                                 }
2266                         }
2267                 }
2268         }
2269
2270         talloc_free(mem_ctx);
2271         return 0;
2272 }
2273
2274 /*
2275   the main monitoring loop
2276  */
2277 static void monitor_cluster(struct ctdb_context *ctdb)
2278 {
2279         uint32_t pnn;
2280         TALLOC_CTX *mem_ctx=NULL;
2281         struct ctdb_node_map *nodemap=NULL;
2282         struct ctdb_node_map *remote_nodemap=NULL;
2283         struct ctdb_vnn_map *vnnmap=NULL;
2284         struct ctdb_vnn_map *remote_vnnmap=NULL;
2285         int32_t debug_level;
2286         int i, j, ret;
2287         struct ctdb_recoverd *rec;
2288         char c;
2289
2290         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2291
2292         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2293         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2294
2295         rec->ctdb = ctdb;
2296         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2297         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2298
2299         rec->priority_time = timeval_current();
2300
2301         /* register a message port for sending memory dumps */
2302         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2303
2304         /* register a message port for recovery elections */
2305         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2306
2307         /* when nodes are disabled/enabled */
2308         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2309
2310         /* when we are asked to puch out a flag change */
2311         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2312
2313         /* when nodes are banned */
2314         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2315
2316         /* and one for when nodes are unbanned */
2317         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2318
2319         /* register a message port for vacuum fetch */
2320         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2321
2322 again:
2323         if (mem_ctx) {
2324                 talloc_free(mem_ctx);
2325                 mem_ctx = NULL;
2326         }
2327         mem_ctx = talloc_new(ctdb);
2328         if (!mem_ctx) {
2329                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2330                 exit(-1);
2331         }
2332
2333         /* we only check for recovery once every second */
2334         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2335
2336         /* verify that the main daemon is still running */
2337         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2338                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2339                 exit(-1);
2340         }
2341
2342         /* ping the local daemon to tell it we are alive */
2343         ctdb_ctrl_recd_ping(ctdb);
2344
2345         if (rec->election_timeout) {
2346                 /* an election is in progress */
2347                 goto again;
2348         }
2349
2350         /* read the debug level from the parent and update locally */
2351         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2352         if (ret !=0) {
2353                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2354                 goto again;
2355         }
2356         LogLevel = debug_level;
2357
2358
2359         /* We must check if we need to ban a node here but we want to do this
2360            as early as possible so we dont wait until we have pulled the node
2361            map from the local node. thats why we have the hardcoded value 20
2362         */
2363         if (rec->culprit_counter > 20) {
2364                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2365                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2366                          ctdb->tunable.recovery_ban_period));
2367                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2368         }
2369
2370         /* get relevant tunables */
2371         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2372         if (ret != 0) {
2373                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2374                 goto again;
2375         }
2376
2377         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2378         if (pnn == (uint32_t)-1) {
2379                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2380                 goto again;
2381         }
2382
2383         /* get the vnnmap */
2384         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2385         if (ret != 0) {
2386                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2387                 goto again;
2388         }
2389
2390
2391         /* get number of nodes */
2392         if (rec->nodemap) {
2393                 talloc_free(rec->nodemap);
2394                 rec->nodemap = NULL;
2395                 nodemap=NULL;
2396         }
2397         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2398         if (ret != 0) {
2399                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2400                 goto again;
2401         }
2402         nodemap = rec->nodemap;
2403
2404         /* check which node is the recovery master */
2405         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2406         if (ret != 0) {
2407                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2408                 goto again;
2409         }
2410
2411         if (rec->recmaster == (uint32_t)-1) {
2412                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2413                 force_election(rec, pnn, nodemap);
2414                 goto again;
2415         }
2416         
2417         /* check that we (recovery daemon) and the local ctdb daemon
2418            agrees on whether we are banned or not
2419         */
2420         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2421                 if (rec->banned_nodes[pnn] == NULL) {
2422                         if (rec->recmaster == pnn) {
2423                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2424
2425                                 ctdb_unban_node(rec, pnn);
2426                         } else {
2427                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2428                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2429                                 ctdb_set_culprit(rec, pnn);
2430                         }
2431                         goto again;
2432                 }
2433         } else {
2434                 if (rec->banned_nodes[pnn] != NULL) {
2435                         if (rec->recmaster == pnn) {
2436                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2437
2438                                 ctdb_unban_node(rec, pnn);
2439                         } else {
2440                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2441
2442                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2443                                 ctdb_set_culprit(rec, pnn);
2444                         }
2445                         goto again;
2446                 }
2447         }
2448
2449         /* remember our own node flags */
2450         rec->node_flags = nodemap->nodes[pnn].flags;
2451
2452         /* count how many active nodes there are */
2453         rec->num_active    = 0;
2454         rec->num_connected = 0;
2455         for (i=0; i<nodemap->num; i++) {
2456                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2457                         rec->num_active++;
2458                 }
2459                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2460                         rec->num_connected++;
2461                 }
2462         }
2463
2464
2465         /* verify that the recmaster node is still active */
2466         for (j=0; j<nodemap->num; j++) {
2467                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2468                         break;
2469                 }
2470         }
2471
2472         if (j == nodemap->num) {
2473                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2474                 force_election(rec, pnn, nodemap);
2475                 goto again;
2476         }
2477
2478         /* if recovery master is disconnected we must elect a new recmaster */
2479         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2480                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2481                 force_election(rec, pnn, nodemap);
2482                 goto again;
2483         }
2484
2485         /* grap the nodemap from the recovery master to check if it is banned */
2486         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2487                                    mem_ctx, &remote_nodemap);
2488         if (ret != 0) {
2489                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2490                           nodemap->nodes[j].pnn));
2491                 goto again;
2492         }
2493
2494
2495         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2496                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2497                 force_election(rec, pnn, nodemap);
2498                 goto again;
2499         }
2500
2501
2502         /* verify that we and the recmaster agrees on our flags */
2503         if (nodemap->nodes[pnn].flags != remote_nodemap->nodes[pnn].flags) {
2504                 DEBUG(DEBUG_ERR, (__location__ " Recmaster disagrees on our flags flags:0x%x recmaster_flags:0x%x  Broadcasting out flags.\n", nodemap->nodes[pnn].flags, remote_nodemap->nodes[pnn].flags));
2505
2506                 update_our_flags_on_all_nodes(ctdb, pnn, nodemap);
2507         }
2508
2509
2510         /* verify that we have all ip addresses we should have and we dont
2511          * have addresses we shouldnt have.
2512          */ 
2513         if (ctdb->do_checkpublicip) {
2514                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2515                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2516                         goto again;
2517                 }
2518         }
2519
2520
2521         /* if we are not the recmaster then we do not need to check
2522            if recovery is needed
2523          */
2524         if (pnn != rec->recmaster) {
2525                 goto again;
2526         }
2527
2528
2529         /* ensure our local copies of flags are right */
2530         ret = update_local_flags(rec, nodemap);
2531         if (ret == MONITOR_ELECTION_NEEDED) {
2532                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2533                 force_election(rec, pnn, nodemap);
2534                 goto again;
2535         }
2536         if (ret != MONITOR_OK) {
2537                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2538                 goto again;
2539         }
2540
2541         /* update the list of public ips that a node can handle for
2542            all connected nodes
2543         */
2544         if (ctdb->num_nodes != nodemap->num) {
2545                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2546                 reload_nodes_file(ctdb);
2547                 goto again;
2548         }
2549         for (j=0; j<nodemap->num; j++) {
2550                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2551                         continue;
2552                 }
2553                 /* release any existing data */
2554                 if (ctdb->nodes[j]->public_ips) {
2555                         talloc_free(ctdb->nodes[j]->public_ips);
2556                         ctdb->nodes[j]->public_ips = NULL;
2557                 }
2558                 /* grab a new shiny list of public ips from the node */
2559                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2560                         ctdb->nodes[j]->pnn, 
2561                         ctdb->nodes,
2562                         &ctdb->nodes[j]->public_ips)) {
2563                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2564                                 ctdb->nodes[j]->pnn));
2565                         goto again;
2566                 }
2567         }
2568
2569
2570         /* verify that all active nodes agree that we are the recmaster */
2571         switch (verify_recmaster(rec, nodemap, pnn)) {
2572         case MONITOR_RECOVERY_NEEDED:
2573                 /* can not happen */
2574                 goto again;
2575         case MONITOR_ELECTION_NEEDED:
2576                 force_election(rec, pnn, nodemap);
2577                 goto again;
2578         case MONITOR_OK:
2579                 break;
2580         case MONITOR_FAILED:
2581                 goto again;
2582         }
2583
2584
2585         if (rec->need_recovery) {
2586                 /* a previous recovery didn't finish */
2587                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
2588                 goto again;             
2589         }
2590
2591         /* verify that all active nodes are in normal mode 
2592            and not in recovery mode 
2593          */
2594         switch (verify_recmode(ctdb, nodemap)) {
2595         case MONITOR_RECOVERY_NEEDED:
2596                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2597                 goto again;
2598         case MONITOR_FAILED:
2599                 goto again;
2600         case MONITOR_ELECTION_NEEDED:
2601                 /* can not happen */
2602         case MONITOR_OK:
2603                 break;
2604         }
2605
2606
2607         /* we should have the reclock - check its not stale */
2608         if (ctdb->recovery_lock_fd == -1) {
2609                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2610                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2611                 goto again;
2612         }
2613
2614         if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
2615                 DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2616                 close(ctdb->recovery_lock_fd);
2617                 ctdb->recovery_lock_fd = -1;
2618                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2619                 goto again;
2620         }
2621
2622         /* get the nodemap for all active remote nodes and verify
2623            they are the same as for this node
2624          */
2625         for (j=0; j<nodemap->num; j++) {
2626                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2627                         continue;
2628                 }
2629                 if (nodemap->nodes[j].pnn == pnn) {
2630                         continue;
2631                 }
2632
2633                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2634                                            mem_ctx, &remote_nodemap);
2635                 if (ret != 0) {
2636                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
2637                                   nodemap->nodes[j].pnn));
2638                         goto again;
2639                 }
2640
2641                 /* if the nodes disagree on how many nodes there are
2642                    then this is a good reason to try recovery
2643                  */
2644                 if (remote_nodemap->num != nodemap->num) {
2645                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2646                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2647                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2648                         goto again;
2649                 }
2650
2651                 /* if the nodes disagree on which nodes exist and are
2652                    active, then that is also a good reason to do recovery
2653                  */
2654                 for (i=0;i<nodemap->num;i++) {
2655                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2656                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2657                                           nodemap->nodes[j].pnn, i, 
2658                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2659                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2660                                             vnnmap, nodemap->nodes[j].pnn);
2661                                 goto again;
2662                         }
2663                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2664                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2665                                 DEBUG(DEBUG_WARNING, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2666                                           nodemap->nodes[j].pnn, i,
2667                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2668                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2669                                             vnnmap, nodemap->nodes[j].pnn);
2670                                 goto again;
2671                         }
2672                 }
2673
2674         }
2675
2676
2677         /* there better be the same number of lmasters in the vnn map
2678            as there are active nodes or we will have to do a recovery
2679          */
2680         if (vnnmap->size != rec->num_active) {
2681                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2682                           vnnmap->size, rec->num_active));
2683                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2684                 goto again;
2685         }
2686
2687         /* verify that all active nodes in the nodemap also exist in 
2688            the vnnmap.
2689          */
2690         for (j=0; j<nodemap->num; j++) {
2691                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2692                         continue;
2693                 }
2694                 if (nodemap->nodes[j].pnn == pnn) {
2695                         continue;
2696                 }
2697
2698                 for (i=0; i<vnnmap->size; i++) {
2699                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2700                                 break;
2701                         }
2702                 }
2703                 if (i == vnnmap->size) {
2704                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2705                                   nodemap->nodes[j].pnn));
2706                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2707                         goto again;
2708                 }
2709         }
2710
2711         
2712         /* verify that all other nodes have the same vnnmap
2713            and are from the same generation
2714          */
2715         for (j=0; j<nodemap->num; j++) {
2716                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2717                         continue;
2718                 }
2719                 if (nodemap->nodes[j].pnn == pnn) {
2720                         continue;
2721                 }
2722
2723                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2724                                           mem_ctx, &remote_vnnmap);
2725                 if (ret != 0) {
2726                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2727                                   nodemap->nodes[j].pnn));
2728                         goto again;
2729                 }
2730
2731                 /* verify the vnnmap generation is the same */
2732                 if (vnnmap->generation != remote_vnnmap->generation) {
2733                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2734                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2735                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2736                         goto again;
2737                 }
2738
2739                 /* verify the vnnmap size is the same */
2740                 if (vnnmap->size != remote_vnnmap->size) {
2741                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2742                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2743                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2744                         goto again;
2745                 }
2746
2747                 /* verify the vnnmap is the same */
2748                 for (i=0;i<vnnmap->size;i++) {
2749                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2750                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2751                                           nodemap->nodes[j].pnn));
2752                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2753                                             vnnmap, nodemap->nodes[j].pnn);
2754                                 goto again;
2755                         }
2756                 }
2757         }
2758
2759         /* we might need to change who has what IP assigned */
2760         if (rec->need_takeover_run) {
2761                 rec->need_takeover_run = false;
2762
2763                 /* execute the "startrecovery" event script on all nodes */
2764                 ret = run_startrecovery_eventscript(rec, nodemap);
2765                 if (ret!=0) {
2766                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
2767                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2768                                     vnnmap, ctdb->pnn);
2769                 }
2770
2771                 ret = ctdb_takeover_run(ctdb, nodemap);
2772                 if (ret != 0) {
2773                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2774                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2775                                     vnnmap, ctdb->pnn);
2776                 }
2777
2778                 /* execute the "recovered" event script on all nodes */
2779                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
2780 #if 0
2781 // we cant check whether the event completed successfully
2782 // since this script WILL fail if the node is in recovery mode
2783 // and if that race happens, the code here would just cause a second
2784 // cascading recovery.
2785                 if (ret!=0) {
2786                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
2787                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2788                                     vnnmap, ctdb->pnn);
2789                 }
2790 #endif
2791         }
2792
2793
2794         goto again;
2795
2796 }
2797
2798 /*
2799   event handler for when the main ctdbd dies
2800  */
2801 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2802                                  uint16_t flags, void *private_data)
2803 {
2804         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
2805         _exit(1);
2806 }
2807
2808 /*
2809   called regularly to verify that the recovery daemon is still running
2810  */
2811 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
2812                               struct timeval yt, void *p)
2813 {
2814         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2815
2816         if (kill(ctdb->recoverd_pid, 0) != 0) {
2817                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
2818
2819                 ctdb_stop_recoverd(ctdb);
2820                 ctdb_stop_keepalive(ctdb);
2821                 ctdb_stop_monitoring(ctdb);
2822                 ctdb_release_all_ips(ctdb);
2823                 if (ctdb->methods != NULL) {
2824                         ctdb->methods->shutdown(ctdb);
2825                 }
2826                 ctdb_event_script(ctdb, "shutdown");
2827
2828                 exit(10);       
2829         }
2830
2831         event_add_timed(ctdb->ev, ctdb, 
2832                         timeval_current_ofs(30, 0),
2833                         ctdb_check_recd, ctdb);
2834 }
2835
2836 static void recd_sig_child_handler(struct event_context *ev,
2837         struct signal_event *se, int signum, int count,
2838         void *dont_care, 
2839         void *private_data)
2840 {
2841 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
2842         int status;
2843         pid_t pid = -1;
2844
2845         while (pid != 0) {
2846                 pid = waitpid(-1, &status, WNOHANG);
2847                 if (pid == -1) {
2848                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
2849                         return;
2850                 }
2851                 if (pid > 0) {
2852                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
2853                 }
2854         }
2855 }
2856
2857 /*
2858   startup the recovery daemon as a child of the main ctdb daemon
2859  */
2860 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2861 {
2862         int ret;
2863         int fd[2];
2864         struct signal_event *se;
2865
2866         if (pipe(fd) != 0) {
2867                 return -1;
2868         }
2869
2870         ctdb->ctdbd_pid = getpid();
2871
2872         ctdb->recoverd_pid = fork();
2873         if (ctdb->recoverd_pid == -1) {
2874                 return -1;
2875         }
2876         
2877         if (ctdb->recoverd_pid != 0) {
2878                 close(fd[0]);
2879                 event_add_timed(ctdb->ev, ctdb, 
2880                                 timeval_current_ofs(30, 0),
2881                                 ctdb_check_recd, ctdb);
2882                 return 0;
2883         }
2884
2885         close(fd[1]);
2886
2887         /* shutdown the transport */
2888         if (ctdb->methods) {
2889                 ctdb->methods->shutdown(ctdb);
2890         }
2891
2892         /* get a new event context */
2893         talloc_free(ctdb->ev);
2894         ctdb->ev = event_context_init(ctdb);
2895
2896         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2897                      ctdb_recoverd_parent, &fd[0]);     
2898
2899         close(ctdb->daemon.sd);
2900         ctdb->daemon.sd = -1;
2901
2902         srandom(getpid() ^ time(NULL));
2903
2904         /* the recovery daemon does not need to be realtime */
2905         if (ctdb->do_setsched) {
2906                 ctdb_restore_scheduler(ctdb);
2907         }
2908
2909         /* initialise ctdb */
2910         ret = ctdb_socket_connect(ctdb);
2911         if (ret != 0) {
2912                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
2913                 exit(1);
2914         }
2915
2916         /* set up a handler to pick up sigchld */
2917         se = event_add_signal(ctdb->ev, ctdb,
2918                                      SIGCHLD, 0,
2919                                      recd_sig_child_handler,
2920                                      ctdb);
2921         if (se == NULL) {
2922                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
2923                 exit(1);
2924         }
2925
2926         monitor_cluster(ctdb);
2927
2928         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
2929         return -1;
2930 }
2931
2932 /*
2933   shutdown the recovery daemon
2934  */
2935 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2936 {
2937         if (ctdb->recoverd_pid == 0) {
2938                 return;
2939         }
2940
2941         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
2942         kill(ctdb->recoverd_pid, SIGTERM);
2943 }