4d533f9bdf40c12cbbc816b37bec033b23f67864
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         uint32_t recmaster;
45         uint32_t num_active;
46         uint32_t num_connected;
47         struct ctdb_node_map *nodemap;
48         uint32_t last_culprit;
49         uint32_t culprit_counter;
50         struct timeval first_recover_time;
51         struct ban_state **banned_nodes;
52         struct timeval priority_time;
53         bool need_takeover_run;
54         bool need_recovery;
55         uint32_t node_flags;
56         struct timed_event *send_election_te;
57         struct timed_event *election_timeout;
58         struct vacuum_info *vacuum_info;
59 };
60
61 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
62 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
63
64
65 /*
66   unban a node
67  */
68 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
69 {
70         struct ctdb_context *ctdb = rec->ctdb;
71
72         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
73
74         if (!ctdb_validate_pnn(ctdb, pnn)) {
75                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
76                 return;
77         }
78
79         /* If we are unbanning a different node then just pass the ban info on */
80         if (pnn != ctdb->pnn) {
81                 TDB_DATA data;
82                 int ret;
83                 
84                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
85
86                 data.dptr = (uint8_t *)&pnn;
87                 data.dsize = sizeof(uint32_t);
88
89                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
90                 if (ret != 0) {
91                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
92                         return;
93                 }
94
95                 return;
96         }
97
98         /* make sure we remember we are no longer banned in case 
99            there is an election */
100         rec->node_flags &= ~NODE_FLAGS_BANNED;
101
102         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
103         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
104
105         if (rec->banned_nodes[pnn] == NULL) {
106                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
107                 return;
108         }
109
110         talloc_free(rec->banned_nodes[pnn]);
111         rec->banned_nodes[pnn] = NULL;
112 }
113
114
115 /*
116   called when a ban has timed out
117  */
118 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
119 {
120         struct ban_state *state = talloc_get_type(p, struct ban_state);
121         struct ctdb_recoverd *rec = state->rec;
122         uint32_t pnn = state->banned_node;
123
124         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
125         ctdb_unban_node(rec, pnn);
126 }
127
128 /*
129   ban a node for a period of time
130  */
131 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
132 {
133         struct ctdb_context *ctdb = rec->ctdb;
134
135         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
136
137         if (!ctdb_validate_pnn(ctdb, pnn)) {
138                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
139                 return;
140         }
141
142         if (0 == ctdb->tunable.enable_bans) {
143                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
144                 return;
145         }
146
147         /* If we are banning a different node then just pass the ban info on */
148         if (pnn != ctdb->pnn) {
149                 struct ctdb_ban_info b;
150                 TDB_DATA data;
151                 int ret;
152                 
153                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
154
155                 b.pnn = pnn;
156                 b.ban_time = ban_time;
157
158                 data.dptr = (uint8_t *)&b;
159                 data.dsize = sizeof(b);
160
161                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
162                 if (ret != 0) {
163                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
164                         return;
165                 }
166
167                 return;
168         }
169
170         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
171         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
172
173         /* banning ourselves - lower our election priority */
174         rec->priority_time = timeval_current();
175
176         /* make sure we remember we are banned in case there is an 
177            election */
178         rec->node_flags |= NODE_FLAGS_BANNED;
179
180         if (rec->banned_nodes[pnn] != NULL) {
181                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
182                 talloc_free(rec->banned_nodes[pnn]);
183                 rec->banned_nodes[pnn] = NULL;
184         }
185
186         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
187         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
188
189         rec->banned_nodes[pnn]->rec = rec;
190         rec->banned_nodes[pnn]->banned_node = pnn;
191
192         if (ban_time != 0) {
193                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
194                                 timeval_current_ofs(ban_time, 0),
195                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
196         }
197 }
198
199 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
200
201
202 /*
203   run the "recovered" eventscript on all nodes
204  */
205 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
206 {
207         TALLOC_CTX *tmp_ctx;
208         uint32_t *nodes;
209
210         tmp_ctx = talloc_new(ctdb);
211         CTDB_NO_MEMORY(ctdb, tmp_ctx);
212
213         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
214         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
215                                         nodes,
216                                         CONTROL_TIMEOUT(), false, tdb_null,
217                                         NULL, NULL,
218                                         NULL) != 0) {
219                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
220
221                 talloc_free(tmp_ctx);
222                 return -1;
223         }
224
225         talloc_free(tmp_ctx);
226         return 0;
227 }
228
229 /*
230   remember the trouble maker
231  */
232 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
233 {
234         struct ctdb_context *ctdb = rec->ctdb;
235
236         if (rec->last_culprit != culprit ||
237             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
238                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
239                 /* either a new node is the culprit, or we've decided to forgive them */
240                 rec->last_culprit = culprit;
241                 rec->first_recover_time = timeval_current();
242                 rec->culprit_counter = 0;
243         }
244         rec->culprit_counter++;
245 }
246
247
248 /* this callback is called for every node that failed to execute the
249    start recovery event
250 */
251 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
252 {
253         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
254
255         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
256
257         ctdb_set_culprit(rec, node_pnn);
258 }
259
260 /*
261   run the "startrecovery" eventscript on all nodes
262  */
263 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
264 {
265         TALLOC_CTX *tmp_ctx;
266         uint32_t *nodes;
267         struct ctdb_context *ctdb = rec->ctdb;
268
269         tmp_ctx = talloc_new(ctdb);
270         CTDB_NO_MEMORY(ctdb, tmp_ctx);
271
272         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
273         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
274                                         nodes,
275                                         CONTROL_TIMEOUT(), false, tdb_null,
276                                         NULL,
277                                         startrecovery_fail_callback,
278                                         rec) != 0) {
279                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
280                 talloc_free(tmp_ctx);
281                 return -1;
282         }
283
284         talloc_free(tmp_ctx);
285         return 0;
286 }
287
288 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
289 {
290         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
291                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
292                 return;
293         }
294         if (node_pnn < ctdb->num_nodes) {
295                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
296         }
297 }
298
299 /*
300   update the node capabilities for all connected nodes
301  */
302 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
303 {
304         uint32_t *nodes;
305         TALLOC_CTX *tmp_ctx;
306
307         tmp_ctx = talloc_new(ctdb);
308         CTDB_NO_MEMORY(ctdb, tmp_ctx);
309
310         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
311         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
312                                         nodes, CONTROL_TIMEOUT(),
313                                         false, tdb_null,
314                                         async_getcap_callback, NULL,
315                                         NULL) != 0) {
316                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
317                 talloc_free(tmp_ctx);
318                 return -1;
319         }
320
321         talloc_free(tmp_ctx);
322         return 0;
323 }
324
325 /*
326   change recovery mode on all nodes
327  */
328 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
329 {
330         TDB_DATA data;
331         uint32_t *nodes;
332         TALLOC_CTX *tmp_ctx;
333
334         tmp_ctx = talloc_new(ctdb);
335         CTDB_NO_MEMORY(ctdb, tmp_ctx);
336
337         /* freeze all nodes */
338         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
340                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
341                                                 nodes, CONTROL_TIMEOUT(),
342                                                 false, tdb_null,
343                                                 NULL, NULL,
344                                                 NULL) != 0) {
345                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
346                         talloc_free(tmp_ctx);
347                         return -1;
348                 }
349         }
350
351
352         data.dsize = sizeof(uint32_t);
353         data.dptr = (unsigned char *)&rec_mode;
354
355         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
356                                         nodes, CONTROL_TIMEOUT(),
357                                         false, data,
358                                         NULL, NULL,
359                                         NULL) != 0) {
360                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
361                 talloc_free(tmp_ctx);
362                 return -1;
363         }
364
365         talloc_free(tmp_ctx);
366         return 0;
367 }
368
369 /*
370   change recovery master on all node
371  */
372 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
373 {
374         TDB_DATA data;
375         TALLOC_CTX *tmp_ctx;
376         uint32_t *nodes;
377
378         tmp_ctx = talloc_new(ctdb);
379         CTDB_NO_MEMORY(ctdb, tmp_ctx);
380
381         data.dsize = sizeof(uint32_t);
382         data.dptr = (unsigned char *)&pnn;
383
384         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
386                                         nodes,
387                                         CONTROL_TIMEOUT(), false, data,
388                                         NULL, NULL,
389                                         NULL) != 0) {
390                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
391                 talloc_free(tmp_ctx);
392                 return -1;
393         }
394
395         talloc_free(tmp_ctx);
396         return 0;
397 }
398
399
400 /*
401   ensure all other nodes have attached to any databases that we have
402  */
403 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
404                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
405 {
406         int i, j, db, ret;
407         struct ctdb_dbid_map *remote_dbmap;
408
409         /* verify that all other nodes have all our databases */
410         for (j=0; j<nodemap->num; j++) {
411                 /* we dont need to ourself ourselves */
412                 if (nodemap->nodes[j].pnn == pnn) {
413                         continue;
414                 }
415                 /* dont check nodes that are unavailable */
416                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
417                         continue;
418                 }
419
420                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
421                                          mem_ctx, &remote_dbmap);
422                 if (ret != 0) {
423                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
424                         return -1;
425                 }
426
427                 /* step through all local databases */
428                 for (db=0; db<dbmap->num;db++) {
429                         const char *name;
430
431
432                         for (i=0;i<remote_dbmap->num;i++) {
433                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
434                                         break;
435                                 }
436                         }
437                         /* the remote node already have this database */
438                         if (i!=remote_dbmap->num) {
439                                 continue;
440                         }
441                         /* ok so we need to create this database */
442                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
443                                             mem_ctx, &name);
444                         if (ret != 0) {
445                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
446                                 return -1;
447                         }
448                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
449                                            mem_ctx, name, dbmap->dbs[db].persistent);
450                         if (ret != 0) {
451                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
452                                 return -1;
453                         }
454                 }
455         }
456
457         return 0;
458 }
459
460
461 /*
462   ensure we are attached to any databases that anyone else is attached to
463  */
464 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
465                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
466 {
467         int i, j, db, ret;
468         struct ctdb_dbid_map *remote_dbmap;
469
470         /* verify that we have all database any other node has */
471         for (j=0; j<nodemap->num; j++) {
472                 /* we dont need to ourself ourselves */
473                 if (nodemap->nodes[j].pnn == pnn) {
474                         continue;
475                 }
476                 /* dont check nodes that are unavailable */
477                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
478                         continue;
479                 }
480
481                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
482                                          mem_ctx, &remote_dbmap);
483                 if (ret != 0) {
484                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
485                         return -1;
486                 }
487
488                 /* step through all databases on the remote node */
489                 for (db=0; db<remote_dbmap->num;db++) {
490                         const char *name;
491
492                         for (i=0;i<(*dbmap)->num;i++) {
493                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
494                                         break;
495                                 }
496                         }
497                         /* we already have this db locally */
498                         if (i!=(*dbmap)->num) {
499                                 continue;
500                         }
501                         /* ok so we need to create this database and
502                            rebuild dbmap
503                          */
504                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
505                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
506                         if (ret != 0) {
507                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
508                                           nodemap->nodes[j].pnn));
509                                 return -1;
510                         }
511                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
512                                            remote_dbmap->dbs[db].persistent);
513                         if (ret != 0) {
514                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
515                                 return -1;
516                         }
517                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
518                         if (ret != 0) {
519                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
520                                 return -1;
521                         }
522                 }
523         }
524
525         return 0;
526 }
527
528
529 /*
530   pull the remote database contents from one node into the recdb
531  */
532 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
533                                     struct tdb_wrap *recdb, uint32_t dbid)
534 {
535         int ret;
536         TDB_DATA outdata;
537         struct ctdb_marshall_buffer *reply;
538         struct ctdb_rec_data *rec;
539         int i;
540         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
541
542         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
543                                CONTROL_TIMEOUT(), &outdata);
544         if (ret != 0) {
545                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
546                 talloc_free(tmp_ctx);
547                 return -1;
548         }
549
550         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
551
552         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
553                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
554                 talloc_free(tmp_ctx);
555                 return -1;
556         }
557         
558         rec = (struct ctdb_rec_data *)&reply->data[0];
559         
560         for (i=0;
561              i<reply->count;
562              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
563                 TDB_DATA key, data;
564                 struct ctdb_ltdb_header *hdr;
565                 TDB_DATA existing;
566                 
567                 key.dptr = &rec->data[0];
568                 key.dsize = rec->keylen;
569                 data.dptr = &rec->data[key.dsize];
570                 data.dsize = rec->datalen;
571                 
572                 hdr = (struct ctdb_ltdb_header *)data.dptr;
573
574                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
575                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
576                         talloc_free(tmp_ctx);
577                         return -1;
578                 }
579
580                 /* fetch the existing record, if any */
581                 existing = tdb_fetch(recdb->tdb, key);
582                 
583                 if (existing.dptr != NULL) {
584                         struct ctdb_ltdb_header header;
585                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
586                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
587                                          (unsigned)existing.dsize, srcnode));
588                                 free(existing.dptr);
589                                 talloc_free(tmp_ctx);
590                                 return -1;
591                         }
592                         header = *(struct ctdb_ltdb_header *)existing.dptr;
593                         free(existing.dptr);
594                         if (!(header.rsn < hdr->rsn ||
595                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
596                                 continue;
597                         }
598                 }
599                 
600                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
601                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
602                         talloc_free(tmp_ctx);
603                         return -1;                              
604                 }
605         }
606
607         talloc_free(tmp_ctx);
608
609         return 0;
610 }
611
612 /*
613   pull all the remote database contents into the recdb
614  */
615 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
616                                 struct tdb_wrap *recdb, uint32_t dbid)
617 {
618         int j;
619
620         /* pull all records from all other nodes across onto this node
621            (this merges based on rsn)
622         */
623         for (j=0; j<nodemap->num; j++) {
624                 /* dont merge from nodes that are unavailable */
625                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
626                         continue;
627                 }
628                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
629                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
630                                  nodemap->nodes[j].pnn));
631                         return -1;
632                 }
633         }
634         
635         return 0;
636 }
637
638
639 /*
640   update flags on all active nodes
641  */
642 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
643 {
644         int ret;
645
646         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
647                 if (ret != 0) {
648                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
649                 return -1;
650         }
651
652         return 0;
653 }
654
655 /*
656   ensure all nodes have the same vnnmap we do
657  */
658 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
659                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
660 {
661         int j, ret;
662
663         /* push the new vnn map out to all the nodes */
664         for (j=0; j<nodemap->num; j++) {
665                 /* dont push to nodes that are unavailable */
666                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
667                         continue;
668                 }
669
670                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
671                 if (ret != 0) {
672                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
673                         return -1;
674                 }
675         }
676
677         return 0;
678 }
679
680
681 /*
682   handler for when the admin bans a node
683 */
684 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
685                         TDB_DATA data, void *private_data)
686 {
687         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
688         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
689         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
690
691         if (data.dsize != sizeof(*b)) {
692                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
693                 talloc_free(mem_ctx);
694                 return;
695         }
696
697         if (b->pnn != ctdb->pnn) {
698                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
699                 return;
700         }
701
702         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
703                  b->pnn, b->ban_time));
704
705         ctdb_ban_node(rec, b->pnn, b->ban_time);
706         talloc_free(mem_ctx);
707 }
708
709 /*
710   handler for when the admin unbans a node
711 */
712 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
713                           TDB_DATA data, void *private_data)
714 {
715         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
716         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
717         uint32_t pnn;
718
719         if (data.dsize != sizeof(uint32_t)) {
720                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
721                 talloc_free(mem_ctx);
722                 return;
723         }
724         pnn = *(uint32_t *)data.dptr;
725
726         if (pnn != ctdb->pnn) {
727                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
728                 return;
729         }
730
731         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
732         ctdb_unban_node(rec, pnn);
733         talloc_free(mem_ctx);
734 }
735
736
737 struct vacuum_info {
738         struct vacuum_info *next, *prev;
739         struct ctdb_recoverd *rec;
740         uint32_t srcnode;
741         struct ctdb_db_context *ctdb_db;
742         struct ctdb_marshall_buffer *recs;
743         struct ctdb_rec_data *r;
744 };
745
746 static void vacuum_fetch_next(struct vacuum_info *v);
747
748 /*
749   called when a vacuum fetch has completed - just free it and do the next one
750  */
751 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
752 {
753         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
754         talloc_free(state);
755         vacuum_fetch_next(v);
756 }
757
758
759 /*
760   process the next element from the vacuum list
761 */
762 static void vacuum_fetch_next(struct vacuum_info *v)
763 {
764         struct ctdb_call call;
765         struct ctdb_rec_data *r;
766
767         while (v->recs->count) {
768                 struct ctdb_client_call_state *state;
769                 TDB_DATA data;
770                 struct ctdb_ltdb_header *hdr;
771
772                 ZERO_STRUCT(call);
773                 call.call_id = CTDB_NULL_FUNC;
774                 call.flags = CTDB_IMMEDIATE_MIGRATION;
775
776                 r = v->r;
777                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
778                 v->recs->count--;
779
780                 call.key.dptr = &r->data[0];
781                 call.key.dsize = r->keylen;
782
783                 /* ensure we don't block this daemon - just skip a record if we can't get
784                    the chainlock */
785                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
786                         continue;
787                 }
788
789                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
790                 if (data.dptr == NULL) {
791                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
792                         continue;
793                 }
794
795                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
796                         free(data.dptr);
797                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
798                         continue;
799                 }
800                 
801                 hdr = (struct ctdb_ltdb_header *)data.dptr;
802                 if (hdr->dmaster == v->rec->ctdb->pnn) {
803                         /* its already local */
804                         free(data.dptr);
805                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
806                         continue;
807                 }
808
809                 free(data.dptr);
810
811                 state = ctdb_call_send(v->ctdb_db, &call);
812                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
813                 if (state == NULL) {
814                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
815                         talloc_free(v);
816                         return;
817                 }
818                 state->async.fn = vacuum_fetch_callback;
819                 state->async.private_data = v;
820                 return;
821         }
822
823         talloc_free(v);
824 }
825
826
827 /*
828   destroy a vacuum info structure
829  */
830 static int vacuum_info_destructor(struct vacuum_info *v)
831 {
832         DLIST_REMOVE(v->rec->vacuum_info, v);
833         return 0;
834 }
835
836
837 /*
838   handler for vacuum fetch
839 */
840 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
841                                  TDB_DATA data, void *private_data)
842 {
843         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
844         struct ctdb_marshall_buffer *recs;
845         int ret, i;
846         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
847         const char *name;
848         struct ctdb_dbid_map *dbmap=NULL;
849         bool persistent = false;
850         struct ctdb_db_context *ctdb_db;
851         struct ctdb_rec_data *r;
852         uint32_t srcnode;
853         struct vacuum_info *v;
854
855         recs = (struct ctdb_marshall_buffer *)data.dptr;
856         r = (struct ctdb_rec_data *)&recs->data[0];
857
858         if (recs->count == 0) {
859                 talloc_free(tmp_ctx);
860                 return;
861         }
862
863         srcnode = r->reqid;
864
865         for (v=rec->vacuum_info;v;v=v->next) {
866                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
867                         /* we're already working on records from this node */
868                         talloc_free(tmp_ctx);
869                         return;
870                 }
871         }
872
873         /* work out if the database is persistent */
874         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
875         if (ret != 0) {
876                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
877                 talloc_free(tmp_ctx);
878                 return;
879         }
880
881         for (i=0;i<dbmap->num;i++) {
882                 if (dbmap->dbs[i].dbid == recs->db_id) {
883                         persistent = dbmap->dbs[i].persistent;
884                         break;
885                 }
886         }
887         if (i == dbmap->num) {
888                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
889                 talloc_free(tmp_ctx);
890                 return;         
891         }
892
893         /* find the name of this database */
894         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
895                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
896                 talloc_free(tmp_ctx);
897                 return;
898         }
899
900         /* attach to it */
901         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
902         if (ctdb_db == NULL) {
903                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
904                 talloc_free(tmp_ctx);
905                 return;
906         }
907
908         v = talloc_zero(rec, struct vacuum_info);
909         if (v == NULL) {
910                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
911                 talloc_free(tmp_ctx);
912                 return;
913         }
914
915         v->rec = rec;
916         v->srcnode = srcnode;
917         v->ctdb_db = ctdb_db;
918         v->recs = talloc_memdup(v, recs, data.dsize);
919         if (v->recs == NULL) {
920                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
921                 talloc_free(v);
922                 talloc_free(tmp_ctx);
923                 return;         
924         }
925         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
926
927         DLIST_ADD(rec->vacuum_info, v);
928
929         talloc_set_destructor(v, vacuum_info_destructor);
930
931         vacuum_fetch_next(v);
932         talloc_free(tmp_ctx);
933 }
934
935
936 /*
937   called when ctdb_wait_timeout should finish
938  */
939 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
940                               struct timeval yt, void *p)
941 {
942         uint32_t *timed_out = (uint32_t *)p;
943         (*timed_out) = 1;
944 }
945
946 /*
947   wait for a given number of seconds
948  */
949 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
950 {
951         uint32_t timed_out = 0;
952         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
953         while (!timed_out) {
954                 event_loop_once(ctdb->ev);
955         }
956 }
957
958 /*
959   called when an election times out (ends)
960  */
961 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
962                                   struct timeval t, void *p)
963 {
964         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
965         rec->election_timeout = NULL;
966 }
967
968
969 /*
970   wait for an election to finish. It finished election_timeout seconds after
971   the last election packet is received
972  */
973 static void ctdb_wait_election(struct ctdb_recoverd *rec)
974 {
975         struct ctdb_context *ctdb = rec->ctdb;
976         while (rec->election_timeout) {
977                 event_loop_once(ctdb->ev);
978         }
979 }
980
981 /*
982   Update our local flags from all remote connected nodes. 
983   This is only run when we are or we belive we are the recovery master
984  */
985 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
986 {
987         int j;
988         struct ctdb_context *ctdb = rec->ctdb;
989         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
990
991         /* get the nodemap for all active remote nodes and verify
992            they are the same as for this node
993          */
994         for (j=0; j<nodemap->num; j++) {
995                 struct ctdb_node_map *remote_nodemap=NULL;
996                 int ret;
997
998                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
999                         continue;
1000                 }
1001                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1002                         continue;
1003                 }
1004
1005                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1006                                            mem_ctx, &remote_nodemap);
1007                 if (ret != 0) {
1008                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1009                                   nodemap->nodes[j].pnn));
1010                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1011                         talloc_free(mem_ctx);
1012                         return MONITOR_FAILED;
1013                 }
1014                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1015                         int ban_changed = (nodemap->nodes[j].flags ^ remote_nodemap->nodes[j].flags) & NODE_FLAGS_BANNED;
1016
1017                         if (ban_changed) {
1018                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1019                                 nodemap->nodes[j].pnn,
1020                                 remote_nodemap->nodes[j].flags,
1021                                 nodemap->nodes[j].flags));
1022                         }
1023
1024                         /* We should tell our daemon about this so it
1025                            updates its flags or else we will log the same 
1026                            message again in the next iteration of recovery.
1027                            Since we are the recovery master we can just as
1028                            well update the flags on all nodes.
1029                         */
1030                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1031                         if (ret != 0) {
1032                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1033                                 return -1;
1034                         }
1035
1036                         /* Update our local copy of the flags in the recovery
1037                            daemon.
1038                         */
1039                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1040                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1041                                  nodemap->nodes[j].flags));
1042                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1043
1044                         /* If the BANNED flag has changed for the node
1045                            this is a good reason to do a new election.
1046                          */
1047                         if (ban_changed) {
1048                                 talloc_free(mem_ctx);
1049                                 return MONITOR_ELECTION_NEEDED;
1050                         }
1051
1052                 }
1053                 talloc_free(remote_nodemap);
1054         }
1055         talloc_free(mem_ctx);
1056         return MONITOR_OK;
1057 }
1058
1059
1060 /* Create a new random generation ip. 
1061    The generation id can not be the INVALID_GENERATION id
1062 */
1063 static uint32_t new_generation(void)
1064 {
1065         uint32_t generation;
1066
1067         while (1) {
1068                 generation = random();
1069
1070                 if (generation != INVALID_GENERATION) {
1071                         break;
1072                 }
1073         }
1074
1075         return generation;
1076 }
1077
1078
1079 /*
1080   create a temporary working database
1081  */
1082 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1083 {
1084         char *name;
1085         struct tdb_wrap *recdb;
1086         unsigned tdb_flags;
1087
1088         /* open up the temporary recovery database */
1089         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1090         if (name == NULL) {
1091                 return NULL;
1092         }
1093         unlink(name);
1094
1095         tdb_flags = TDB_NOLOCK;
1096         if (!ctdb->do_setsched) {
1097                 tdb_flags |= TDB_NOMMAP;
1098         }
1099
1100         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1101                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1102         if (recdb == NULL) {
1103                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1104         }
1105
1106         talloc_free(name);
1107
1108         return recdb;
1109 }
1110
1111
1112 /* 
1113    a traverse function for pulling all relevent records from recdb
1114  */
1115 struct recdb_data {
1116         struct ctdb_context *ctdb;
1117         struct ctdb_marshall_buffer *recdata;
1118         uint32_t len;
1119         bool failed;
1120 };
1121
1122 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1123 {
1124         struct recdb_data *params = (struct recdb_data *)p;
1125         struct ctdb_rec_data *rec;
1126         struct ctdb_ltdb_header *hdr;
1127
1128         /* skip empty records */
1129         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1130                 return 0;
1131         }
1132
1133         /* update the dmaster field to point to us */
1134         hdr = (struct ctdb_ltdb_header *)data.dptr;
1135         hdr->dmaster = params->ctdb->pnn;
1136
1137         /* add the record to the blob ready to send to the nodes */
1138         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1139         if (rec == NULL) {
1140                 params->failed = true;
1141                 return -1;
1142         }
1143         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1144         if (params->recdata == NULL) {
1145                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1146                          rec->length + params->len, params->recdata->count));
1147                 params->failed = true;
1148                 return -1;
1149         }
1150         params->recdata->count++;
1151         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1152         params->len += rec->length;
1153         talloc_free(rec);
1154
1155         return 0;
1156 }
1157
1158 /*
1159   push the recdb database out to all nodes
1160  */
1161 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1162                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1163 {
1164         struct recdb_data params;
1165         struct ctdb_marshall_buffer *recdata;
1166         TDB_DATA outdata;
1167         TALLOC_CTX *tmp_ctx;
1168         uint32_t *nodes;
1169
1170         tmp_ctx = talloc_new(ctdb);
1171         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1172
1173         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1174         CTDB_NO_MEMORY(ctdb, recdata);
1175
1176         recdata->db_id = dbid;
1177
1178         params.ctdb = ctdb;
1179         params.recdata = recdata;
1180         params.len = offsetof(struct ctdb_marshall_buffer, data);
1181         params.failed = false;
1182
1183         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1184                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1185                 talloc_free(params.recdata);
1186                 talloc_free(tmp_ctx);
1187                 return -1;
1188         }
1189
1190         if (params.failed) {
1191                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1192                 talloc_free(params.recdata);
1193                 talloc_free(tmp_ctx);
1194                 return -1;              
1195         }
1196
1197         recdata = params.recdata;
1198
1199         outdata.dptr = (void *)recdata;
1200         outdata.dsize = params.len;
1201
1202         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1204                                         nodes,
1205                                         CONTROL_TIMEOUT(), false, outdata,
1206                                         NULL, NULL,
1207                                         NULL) != 0) {
1208                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1209                 talloc_free(recdata);
1210                 talloc_free(tmp_ctx);
1211                 return -1;
1212         }
1213
1214         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1215                   dbid, recdata->count));
1216
1217         talloc_free(recdata);
1218         talloc_free(tmp_ctx);
1219
1220         return 0;
1221 }
1222
1223
1224 /*
1225   go through a full recovery on one database 
1226  */
1227 static int recover_database(struct ctdb_recoverd *rec, 
1228                             TALLOC_CTX *mem_ctx,
1229                             uint32_t dbid,
1230                             uint32_t pnn, 
1231                             struct ctdb_node_map *nodemap,
1232                             uint32_t transaction_id)
1233 {
1234         struct tdb_wrap *recdb;
1235         int ret;
1236         struct ctdb_context *ctdb = rec->ctdb;
1237         TDB_DATA data;
1238         struct ctdb_control_wipe_database w;
1239         uint32_t *nodes;
1240
1241         recdb = create_recdb(ctdb, mem_ctx);
1242         if (recdb == NULL) {
1243                 return -1;
1244         }
1245
1246         /* pull all remote databases onto the recdb */
1247         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1248         if (ret != 0) {
1249                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1250                 return -1;
1251         }
1252
1253         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1254
1255         /* wipe all the remote databases. This is safe as we are in a transaction */
1256         w.db_id = dbid;
1257         w.transaction_id = transaction_id;
1258
1259         data.dptr = (void *)&w;
1260         data.dsize = sizeof(w);
1261
1262         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1263         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1264                                         nodes,
1265                                         CONTROL_TIMEOUT(), false, data,
1266                                         NULL, NULL,
1267                                         NULL) != 0) {
1268                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1269                 talloc_free(recdb);
1270                 return -1;
1271         }
1272         
1273         /* push out the correct database. This sets the dmaster and skips 
1274            the empty records */
1275         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1276         if (ret != 0) {
1277                 talloc_free(recdb);
1278                 return -1;
1279         }
1280
1281         /* all done with this database */
1282         talloc_free(recdb);
1283
1284         return 0;
1285 }
1286
1287 /*
1288   reload the nodes file 
1289 */
1290 static void reload_nodes_file(struct ctdb_context *ctdb)
1291 {
1292         ctdb->nodes = NULL;
1293         ctdb_load_nodes_file(ctdb);
1294 }
1295
1296         
1297 /*
1298   we are the recmaster, and recovery is needed - start a recovery run
1299  */
1300 static int do_recovery(struct ctdb_recoverd *rec, 
1301                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1302                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1303                        int32_t culprit)
1304 {
1305         struct ctdb_context *ctdb = rec->ctdb;
1306         int i, j, ret;
1307         uint32_t generation;
1308         struct ctdb_dbid_map *dbmap;
1309         TDB_DATA data;
1310         uint32_t *nodes;
1311
1312         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1313
1314         if (ctdb->num_nodes != nodemap->num) {
1315                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
1316                 reload_nodes_file(ctdb);
1317                 return -1;
1318         }
1319
1320         /* if recovery fails, force it again */
1321         rec->need_recovery = true;
1322
1323         if (culprit != -1) {
1324                 ctdb_set_culprit(rec, culprit);
1325         }
1326
1327         if (rec->culprit_counter > 2*nodemap->num) {
1328                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1329                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1330                          ctdb->tunable.recovery_ban_period));
1331                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1332         }
1333
1334         if (!ctdb_recovery_lock(ctdb, true)) {
1335                 ctdb_set_culprit(rec, pnn);
1336                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1337                 return -1;
1338         }
1339
1340         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1341
1342         /* get a list of all databases */
1343         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1344         if (ret != 0) {
1345                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1346                 return -1;
1347         }
1348
1349         /* we do the db creation before we set the recovery mode, so the freeze happens
1350            on all databases we will be dealing with. */
1351
1352         /* verify that we have all the databases any other node has */
1353         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1354         if (ret != 0) {
1355                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1356                 return -1;
1357         }
1358
1359         /* verify that all other nodes have all our databases */
1360         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1363                 return -1;
1364         }
1365
1366         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1367
1368
1369         /* set recovery mode to active on all nodes */
1370         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1371         if (ret != 0) {
1372                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1373                 return -1;
1374         }
1375
1376         /* execute the "startrecovery" event script on all nodes */
1377         ret = run_startrecovery_eventscript(rec, nodemap);
1378         if (ret!=0) {
1379                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1380                 return -1;
1381         }
1382
1383         /* pick a new generation number */
1384         generation = new_generation();
1385
1386         /* change the vnnmap on this node to use the new generation 
1387            number but not on any other nodes.
1388            this guarantees that if we abort the recovery prematurely
1389            for some reason (a node stops responding?)
1390            that we can just return immediately and we will reenter
1391            recovery shortly again.
1392            I.e. we deliberately leave the cluster with an inconsistent
1393            generation id to allow us to abort recovery at any stage and
1394            just restart it from scratch.
1395          */
1396         vnnmap->generation = generation;
1397         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1398         if (ret != 0) {
1399                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1400                 return -1;
1401         }
1402
1403         data.dptr = (void *)&generation;
1404         data.dsize = sizeof(uint32_t);
1405
1406         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1407         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1408                                         nodes,
1409                                         CONTROL_TIMEOUT(), false, data,
1410                                         NULL, NULL,
1411                                         NULL) != 0) {
1412                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1413                 return -1;
1414         }
1415
1416         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1417
1418         for (i=0;i<dbmap->num;i++) {
1419                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1420                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1421                         return -1;
1422                 }
1423         }
1424
1425         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1426
1427         /* commit all the changes */
1428         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1429                                         nodes,
1430                                         CONTROL_TIMEOUT(), false, data,
1431                                         NULL, NULL,
1432                                         NULL) != 0) {
1433                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1434                 return -1;
1435         }
1436
1437         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1438         
1439
1440         /* update the capabilities for all nodes */
1441         ret = update_capabilities(ctdb, nodemap);
1442         if (ret!=0) {
1443                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1444                 return -1;
1445         }
1446
1447         /* build a new vnn map with all the currently active and
1448            unbanned nodes */
1449         generation = new_generation();
1450         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1451         CTDB_NO_MEMORY(ctdb, vnnmap);
1452         vnnmap->generation = generation;
1453         vnnmap->size = 0;
1454         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1455         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1456         for (i=j=0;i<nodemap->num;i++) {
1457                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1458                         continue;
1459                 }
1460                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1461                         /* this node can not be an lmaster */
1462                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1463                         continue;
1464                 }
1465
1466                 vnnmap->size++;
1467                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1468                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1469                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1470
1471         }
1472         if (vnnmap->size == 0) {
1473                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1474                 vnnmap->size++;
1475                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1476                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1477                 vnnmap->map[0] = pnn;
1478         }       
1479
1480         /* update to the new vnnmap on all nodes */
1481         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1482         if (ret != 0) {
1483                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1484                 return -1;
1485         }
1486
1487         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1488
1489         /* update recmaster to point to us for all nodes */
1490         ret = set_recovery_master(ctdb, nodemap, pnn);
1491         if (ret!=0) {
1492                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1493                 return -1;
1494         }
1495
1496         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1497
1498         /*
1499           update all nodes to have the same flags that we have
1500          */
1501         for (i=0;i<nodemap->num;i++) {
1502                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1503                         continue;
1504                 }
1505
1506                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1507                 if (ret != 0) {
1508                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1509                         return -1;
1510                 }
1511         }
1512
1513         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1514
1515         /* disable recovery mode */
1516         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1517         if (ret != 0) {
1518                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1519                 return -1;
1520         }
1521
1522         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1523
1524         /*
1525           tell nodes to takeover their public IPs
1526          */
1527         rec->need_takeover_run = false;
1528         ret = ctdb_takeover_run(ctdb, nodemap);
1529         if (ret != 0) {
1530                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1531                 return -1;
1532         }
1533         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1534
1535         /* execute the "recovered" event script on all nodes */
1536         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1537         if (ret!=0) {
1538                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1539                 return -1;
1540         }
1541
1542         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1543
1544         /* send a message to all clients telling them that the cluster 
1545            has been reconfigured */
1546         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1547
1548         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1549
1550         rec->need_recovery = false;
1551
1552         /* We just finished a recovery successfully. 
1553            We now wait for rerecovery_timeout before we allow 
1554            another recovery to take place.
1555         */
1556         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1557         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1558         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1559
1560         return 0;
1561 }
1562
1563
1564 /*
1565   elections are won by first checking the number of connected nodes, then
1566   the priority time, then the pnn
1567  */
1568 struct election_message {
1569         uint32_t num_connected;
1570         struct timeval priority_time;
1571         uint32_t pnn;
1572         uint32_t node_flags;
1573 };
1574
1575 /*
1576   form this nodes election data
1577  */
1578 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1579 {
1580         int ret, i;
1581         struct ctdb_node_map *nodemap;
1582         struct ctdb_context *ctdb = rec->ctdb;
1583
1584         ZERO_STRUCTP(em);
1585
1586         em->pnn = rec->ctdb->pnn;
1587         em->priority_time = rec->priority_time;
1588         em->node_flags = rec->node_flags;
1589
1590         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1591         if (ret != 0) {
1592                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1593                 return;
1594         }
1595
1596         for (i=0;i<nodemap->num;i++) {
1597                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1598                         em->num_connected++;
1599                 }
1600         }
1601
1602         /* we shouldnt try to win this election if we cant be a recmaster */
1603         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1604                 em->num_connected = 0;
1605                 em->priority_time = timeval_current();
1606         }
1607
1608         talloc_free(nodemap);
1609 }
1610
1611 /*
1612   see if the given election data wins
1613  */
1614 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1615 {
1616         struct election_message myem;
1617         int cmp = 0;
1618
1619         ctdb_election_data(rec, &myem);
1620
1621         /* we cant win if we dont have the recmaster capability */
1622         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1623                 return false;
1624         }
1625
1626         /* we cant win if we are banned */
1627         if (rec->node_flags & NODE_FLAGS_BANNED) {
1628                 return false;
1629         }       
1630
1631         /* we will automatically win if the other node is banned */
1632         if (em->node_flags & NODE_FLAGS_BANNED) {
1633                 return true;
1634         }
1635
1636         /* try to use the most connected node */
1637         if (cmp == 0) {
1638                 cmp = (int)myem.num_connected - (int)em->num_connected;
1639         }
1640
1641         /* then the longest running node */
1642         if (cmp == 0) {
1643                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1644         }
1645
1646         if (cmp == 0) {
1647                 cmp = (int)myem.pnn - (int)em->pnn;
1648         }
1649
1650         return cmp > 0;
1651 }
1652
1653 /*
1654   send out an election request
1655  */
1656 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1657 {
1658         int ret;
1659         TDB_DATA election_data;
1660         struct election_message emsg;
1661         uint64_t srvid;
1662         struct ctdb_context *ctdb = rec->ctdb;
1663
1664         srvid = CTDB_SRVID_RECOVERY;
1665
1666         ctdb_election_data(rec, &emsg);
1667
1668         election_data.dsize = sizeof(struct election_message);
1669         election_data.dptr  = (unsigned char *)&emsg;
1670
1671
1672         /* send an election message to all active nodes */
1673         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1674
1675
1676         /* A new node that is already frozen has entered the cluster.
1677            The existing nodes are not frozen and dont need to be frozen
1678            until the election has ended and we start the actual recovery
1679         */
1680         if (update_recmaster == true) {
1681                 /* first we assume we will win the election and set 
1682                    recoverymaster to be ourself on the current node
1683                  */
1684                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1685                 if (ret != 0) {
1686                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1687                         return -1;
1688                 }
1689         }
1690
1691
1692         return 0;
1693 }
1694
1695 /*
1696   this function will unban all nodes in the cluster
1697 */
1698 static void unban_all_nodes(struct ctdb_context *ctdb)
1699 {
1700         int ret, i;
1701         struct ctdb_node_map *nodemap;
1702         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1703         
1704         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1705         if (ret != 0) {
1706                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1707                 return;
1708         }
1709
1710         for (i=0;i<nodemap->num;i++) {
1711                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1712                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1713                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1714                 }
1715         }
1716
1717         talloc_free(tmp_ctx);
1718 }
1719
1720
1721 /*
1722   we think we are winning the election - send a broadcast election request
1723  */
1724 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1725 {
1726         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1727         int ret;
1728
1729         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1730         if (ret != 0) {
1731                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1732         }
1733
1734         talloc_free(rec->send_election_te);
1735         rec->send_election_te = NULL;
1736 }
1737
1738 /*
1739   handler for memory dumps
1740 */
1741 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1742                              TDB_DATA data, void *private_data)
1743 {
1744         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1745         TDB_DATA *dump;
1746         int ret;
1747         struct rd_memdump_reply *rd;
1748
1749         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1750                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1751                 talloc_free(tmp_ctx);
1752                 return;
1753         }
1754         rd = (struct rd_memdump_reply *)data.dptr;
1755
1756         dump = talloc_zero(tmp_ctx, TDB_DATA);
1757         if (dump == NULL) {
1758                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1759                 talloc_free(tmp_ctx);
1760                 return;
1761         }
1762         ret = ctdb_dump_memory(ctdb, dump);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1765                 talloc_free(tmp_ctx);
1766                 return;
1767         }
1768
1769 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1770
1771         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1774                 talloc_free(tmp_ctx);
1775                 return;
1776         }
1777
1778         talloc_free(tmp_ctx);
1779 }
1780
1781 /*
1782   handler for recovery master elections
1783 */
1784 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1785                              TDB_DATA data, void *private_data)
1786 {
1787         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1788         int ret;
1789         struct election_message *em = (struct election_message *)data.dptr;
1790         TALLOC_CTX *mem_ctx;
1791
1792         /* we got an election packet - update the timeout for the election */
1793         talloc_free(rec->election_timeout);
1794         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1795                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1796                                                 ctdb_election_timeout, rec);
1797
1798         mem_ctx = talloc_new(ctdb);
1799
1800         /* someone called an election. check their election data
1801            and if we disagree and we would rather be the elected node, 
1802            send a new election message to all other nodes
1803          */
1804         if (ctdb_election_win(rec, em)) {
1805                 if (!rec->send_election_te) {
1806                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1807                                                                 timeval_current_ofs(0, 500000),
1808                                                                 election_send_request, rec);
1809                 }
1810                 talloc_free(mem_ctx);
1811                 /*unban_all_nodes(ctdb);*/
1812                 return;
1813         }
1814         
1815         /* we didn't win */
1816         talloc_free(rec->send_election_te);
1817         rec->send_election_te = NULL;
1818
1819         /* release the recmaster lock */
1820         if (em->pnn != ctdb->pnn &&
1821             ctdb->recovery_lock_fd != -1) {
1822                 close(ctdb->recovery_lock_fd);
1823                 ctdb->recovery_lock_fd = -1;
1824                 unban_all_nodes(ctdb);
1825         }
1826
1827         /* ok, let that guy become recmaster then */
1828         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1829         if (ret != 0) {
1830                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1831                 talloc_free(mem_ctx);
1832                 return;
1833         }
1834
1835         /* release any bans */
1836         rec->last_culprit = (uint32_t)-1;
1837         talloc_free(rec->banned_nodes);
1838         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1839         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1840
1841         talloc_free(mem_ctx);
1842         return;
1843 }
1844
1845
1846 /*
1847   force the start of the election process
1848  */
1849 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1850                            struct ctdb_node_map *nodemap)
1851 {
1852         int ret;
1853         struct ctdb_context *ctdb = rec->ctdb;
1854
1855         /* set all nodes to recovery mode to stop all internode traffic */
1856         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1857         if (ret != 0) {
1858                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1859                 return;
1860         }
1861
1862         talloc_free(rec->election_timeout);
1863         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1864                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1865                                                 ctdb_election_timeout, rec);
1866
1867         ret = send_election_request(rec, pnn, true);
1868         if (ret!=0) {
1869                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1870                 return;
1871         }
1872
1873         /* wait for a few seconds to collect all responses */
1874         ctdb_wait_election(rec);
1875 }
1876
1877
1878
1879 /*
1880   handler for when a node changes its flags
1881 */
1882 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1883                             TDB_DATA data, void *private_data)
1884 {
1885         int ret;
1886         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1887         struct ctdb_node_map *nodemap=NULL;
1888         TALLOC_CTX *tmp_ctx;
1889         uint32_t changed_flags;
1890         int i;
1891         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1892
1893         if (data.dsize != sizeof(*c)) {
1894                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1895                 return;
1896         }
1897
1898         tmp_ctx = talloc_new(ctdb);
1899         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1900
1901         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1902         if (ret != 0) {
1903                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1904                 talloc_free(tmp_ctx);
1905                 return;         
1906         }
1907
1908
1909         for (i=0;i<nodemap->num;i++) {
1910                 if (nodemap->nodes[i].pnn == c->pnn) break;
1911         }
1912
1913         if (i == nodemap->num) {
1914                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1915                 talloc_free(tmp_ctx);
1916                 return;
1917         }
1918
1919         changed_flags = c->old_flags ^ c->new_flags;
1920
1921         if (nodemap->nodes[i].flags != c->new_flags) {
1922                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1923         }
1924
1925         nodemap->nodes[i].flags = c->new_flags;
1926
1927         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1928                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1929
1930         if (ret == 0) {
1931                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1932                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1933         }
1934         
1935         if (ret == 0 &&
1936             ctdb->recovery_master == ctdb->pnn &&
1937             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1938                 /* Only do the takeover run if the perm disabled or unhealthy
1939                    flags changed since these will cause an ip failover but not
1940                    a recovery.
1941                    If the node became disconnected or banned this will also
1942                    lead to an ip address failover but that is handled 
1943                    during recovery
1944                 */
1945                 if (changed_flags & NODE_FLAGS_DISABLED) {
1946                         rec->need_takeover_run = true;
1947                 }
1948         }
1949
1950         talloc_free(tmp_ctx);
1951 }
1952
1953 /*
1954   handler for when we need to push out flag changes ot all other nodes
1955 */
1956 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1957                             TDB_DATA data, void *private_data)
1958 {
1959         int ret;
1960         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1961
1962         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
1963         if (ret != 0) {
1964                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1965         }
1966 }
1967
1968
1969 struct verify_recmode_normal_data {
1970         uint32_t count;
1971         enum monitor_result status;
1972 };
1973
1974 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1975 {
1976         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1977
1978
1979         /* one more node has responded with recmode data*/
1980         rmdata->count--;
1981
1982         /* if we failed to get the recmode, then return an error and let
1983            the main loop try again.
1984         */
1985         if (state->state != CTDB_CONTROL_DONE) {
1986                 if (rmdata->status == MONITOR_OK) {
1987                         rmdata->status = MONITOR_FAILED;
1988                 }
1989                 return;
1990         }
1991
1992         /* if we got a response, then the recmode will be stored in the
1993            status field
1994         */
1995         if (state->status != CTDB_RECOVERY_NORMAL) {
1996                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1997                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1998         }
1999
2000         return;
2001 }
2002
2003
2004 /* verify that all nodes are in normal recovery mode */
2005 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2006 {
2007         struct verify_recmode_normal_data *rmdata;
2008         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2009         struct ctdb_client_control_state *state;
2010         enum monitor_result status;
2011         int j;
2012         
2013         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2014         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2015         rmdata->count  = 0;
2016         rmdata->status = MONITOR_OK;
2017
2018         /* loop over all active nodes and send an async getrecmode call to 
2019            them*/
2020         for (j=0; j<nodemap->num; j++) {
2021                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2022                         continue;
2023                 }
2024                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2025                                         CONTROL_TIMEOUT(), 
2026                                         nodemap->nodes[j].pnn);
2027                 if (state == NULL) {
2028                         /* we failed to send the control, treat this as 
2029                            an error and try again next iteration
2030                         */                      
2031                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2032                         talloc_free(mem_ctx);
2033                         return MONITOR_FAILED;
2034                 }
2035
2036                 /* set up the callback functions */
2037                 state->async.fn = verify_recmode_normal_callback;
2038                 state->async.private_data = rmdata;
2039
2040                 /* one more control to wait for to complete */
2041                 rmdata->count++;
2042         }
2043
2044
2045         /* now wait for up to the maximum number of seconds allowed
2046            or until all nodes we expect a response from has replied
2047         */
2048         while (rmdata->count > 0) {
2049                 event_loop_once(ctdb->ev);
2050         }
2051
2052         status = rmdata->status;
2053         talloc_free(mem_ctx);
2054         return status;
2055 }
2056
2057
2058 struct verify_recmaster_data {
2059         struct ctdb_recoverd *rec;
2060         uint32_t count;
2061         uint32_t pnn;
2062         enum monitor_result status;
2063 };
2064
2065 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2066 {
2067         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2068
2069
2070         /* one more node has responded with recmaster data*/
2071         rmdata->count--;
2072
2073         /* if we failed to get the recmaster, then return an error and let
2074            the main loop try again.
2075         */
2076         if (state->state != CTDB_CONTROL_DONE) {
2077                 if (rmdata->status == MONITOR_OK) {
2078                         rmdata->status = MONITOR_FAILED;
2079                 }
2080                 return;
2081         }
2082
2083         /* if we got a response, then the recmaster will be stored in the
2084            status field
2085         */
2086         if (state->status != rmdata->pnn) {
2087                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2088                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2089                 rmdata->status = MONITOR_ELECTION_NEEDED;
2090         }
2091
2092         return;
2093 }
2094
2095
2096 /* verify that all nodes agree that we are the recmaster */
2097 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2098 {
2099         struct ctdb_context *ctdb = rec->ctdb;
2100         struct verify_recmaster_data *rmdata;
2101         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2102         struct ctdb_client_control_state *state;
2103         enum monitor_result status;
2104         int j;
2105         
2106         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2107         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2108         rmdata->rec    = rec;
2109         rmdata->count  = 0;
2110         rmdata->pnn    = pnn;
2111         rmdata->status = MONITOR_OK;
2112
2113         /* loop over all active nodes and send an async getrecmaster call to 
2114            them*/
2115         for (j=0; j<nodemap->num; j++) {
2116                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2117                         continue;
2118                 }
2119                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2120                                         CONTROL_TIMEOUT(),
2121                                         nodemap->nodes[j].pnn);
2122                 if (state == NULL) {
2123                         /* we failed to send the control, treat this as 
2124                            an error and try again next iteration
2125                         */                      
2126                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2127                         talloc_free(mem_ctx);
2128                         return MONITOR_FAILED;
2129                 }
2130
2131                 /* set up the callback functions */
2132                 state->async.fn = verify_recmaster_callback;
2133                 state->async.private_data = rmdata;
2134
2135                 /* one more control to wait for to complete */
2136                 rmdata->count++;
2137         }
2138
2139
2140         /* now wait for up to the maximum number of seconds allowed
2141            or until all nodes we expect a response from has replied
2142         */
2143         while (rmdata->count > 0) {
2144                 event_loop_once(ctdb->ev);
2145         }
2146
2147         status = rmdata->status;
2148         talloc_free(mem_ctx);
2149         return status;
2150 }
2151
2152
2153 /* called to check that the allocation of public ip addresses is ok.
2154 */
2155 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2156 {
2157         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2158         struct ctdb_all_public_ips *ips = NULL;
2159         struct ctdb_uptime *uptime1 = NULL;
2160         struct ctdb_uptime *uptime2 = NULL;
2161         int ret, j;
2162
2163         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2164                                 CTDB_CURRENT_NODE, &uptime1);
2165         if (ret != 0) {
2166                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2167                 talloc_free(mem_ctx);
2168                 return -1;
2169         }
2170
2171         /* read the ip allocation from the local node */
2172         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2173         if (ret != 0) {
2174                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2175                 talloc_free(mem_ctx);
2176                 return -1;
2177         }
2178
2179         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2180                                 CTDB_CURRENT_NODE, &uptime2);
2181         if (ret != 0) {
2182                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2183                 talloc_free(mem_ctx);
2184                 return -1;
2185         }
2186
2187         /* skip the check if the startrecovery time has changed */
2188         if (timeval_compare(&uptime1->last_recovery_started,
2189                             &uptime2->last_recovery_started) != 0) {
2190                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2191                 talloc_free(mem_ctx);
2192                 return 0;
2193         }
2194
2195         /* skip the check if the endrecovery time has changed */
2196         if (timeval_compare(&uptime1->last_recovery_finished,
2197                             &uptime2->last_recovery_finished) != 0) {
2198                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2199                 talloc_free(mem_ctx);
2200                 return 0;
2201         }
2202
2203         /* skip the check if we have started but not finished recovery */
2204         if (timeval_compare(&uptime1->last_recovery_finished,
2205                             &uptime1->last_recovery_started) != 1) {
2206                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2207                 talloc_free(mem_ctx);
2208
2209                 return 0;
2210         }
2211
2212         /* verify that we have the ip addresses we should have
2213            and we dont have ones we shouldnt have.
2214            if we find an inconsistency we set recmode to
2215            active on the local node and wait for the recmaster
2216            to do a full blown recovery
2217         */
2218         for (j=0; j<ips->num; j++) {
2219                 if (ips->ips[j].pnn == pnn) {
2220                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2221                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2222                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2223                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2224                                 if (ret != 0) {
2225                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2226
2227                                         talloc_free(mem_ctx);
2228                                         return -1;
2229                                 }
2230                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2231                                 if (ret != 0) {
2232                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2233
2234                                         talloc_free(mem_ctx);
2235                                         return -1;
2236                                 }
2237                         }
2238                 } else {
2239                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2240                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2241                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2242
2243                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2244                                 if (ret != 0) {
2245                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2246
2247                                         talloc_free(mem_ctx);
2248                                         return -1;
2249                                 }
2250                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2251                                 if (ret != 0) {
2252                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2253
2254                                         talloc_free(mem_ctx);
2255                                         return -1;
2256                                 }
2257                         }
2258                 }
2259         }
2260
2261         talloc_free(mem_ctx);
2262         return 0;
2263 }
2264
2265
2266 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2267 {
2268         struct ctdb_node_map **remote_nodemaps = callback_data;
2269
2270         if (node_pnn >= ctdb->num_nodes) {
2271                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2272                 return;
2273         }
2274
2275         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2276
2277 }
2278
2279 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2280         struct ctdb_node_map *nodemap,
2281         struct ctdb_node_map **remote_nodemaps)
2282 {
2283         uint32_t *nodes;
2284
2285         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2286         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2287                                         nodes,
2288                                         CONTROL_TIMEOUT(), false, tdb_null,
2289                                         async_getnodemap_callback,
2290                                         NULL,
2291                                         remote_nodemaps) != 0) {
2292                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2293
2294                 return -1;
2295         }
2296
2297         return 0;
2298 }
2299
2300 /*
2301   the main monitoring loop
2302  */
2303 static void monitor_cluster(struct ctdb_context *ctdb)
2304 {
2305         uint32_t pnn;
2306         TALLOC_CTX *mem_ctx=NULL;
2307         struct ctdb_node_map *nodemap=NULL;
2308         struct ctdb_node_map *recmaster_nodemap=NULL;
2309         struct ctdb_node_map **remote_nodemaps=NULL;
2310         struct ctdb_vnn_map *vnnmap=NULL;
2311         struct ctdb_vnn_map *remote_vnnmap=NULL;
2312         int32_t debug_level;
2313         int i, j, ret;
2314         struct ctdb_recoverd *rec;
2315         char c;
2316
2317         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2318
2319         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2320         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2321
2322         rec->ctdb = ctdb;
2323         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2324         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2325
2326         rec->priority_time = timeval_current();
2327
2328         /* register a message port for sending memory dumps */
2329         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2330
2331         /* register a message port for recovery elections */
2332         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2333
2334         /* when nodes are disabled/enabled */
2335         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2336
2337         /* when we are asked to puch out a flag change */
2338         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2339
2340         /* when nodes are banned */
2341         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2342
2343         /* and one for when nodes are unbanned */
2344         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2345
2346         /* register a message port for vacuum fetch */
2347         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2348
2349 again:
2350         if (mem_ctx) {
2351                 talloc_free(mem_ctx);
2352                 mem_ctx = NULL;
2353         }
2354         mem_ctx = talloc_new(ctdb);
2355         if (!mem_ctx) {
2356                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2357                 exit(-1);
2358         }
2359
2360         /* we only check for recovery once every second */
2361         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2362
2363         /* verify that the main daemon is still running */
2364         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2365                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2366                 exit(-1);
2367         }
2368
2369         /* ping the local daemon to tell it we are alive */
2370         ctdb_ctrl_recd_ping(ctdb);
2371
2372         if (rec->election_timeout) {
2373                 /* an election is in progress */
2374                 goto again;
2375         }
2376
2377         /* read the debug level from the parent and update locally */
2378         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2379         if (ret !=0) {
2380                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2381                 goto again;
2382         }
2383         LogLevel = debug_level;
2384
2385
2386         /* We must check if we need to ban a node here but we want to do this
2387            as early as possible so we dont wait until we have pulled the node
2388            map from the local node. thats why we have the hardcoded value 20
2389         */
2390         if (rec->culprit_counter > 20) {
2391                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2392                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2393                          ctdb->tunable.recovery_ban_period));
2394                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2395         }
2396
2397         /* get relevant tunables */
2398         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2399         if (ret != 0) {
2400                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2401                 goto again;
2402         }
2403
2404         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2405         if (pnn == (uint32_t)-1) {
2406                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2407                 goto again;
2408         }
2409
2410         /* get the vnnmap */
2411         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2412         if (ret != 0) {
2413                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2414                 goto again;
2415         }
2416
2417
2418         /* get number of nodes */
2419         if (rec->nodemap) {
2420                 talloc_free(rec->nodemap);
2421                 rec->nodemap = NULL;
2422                 nodemap=NULL;
2423         }
2424         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2425         if (ret != 0) {
2426                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2427                 goto again;
2428         }
2429         nodemap = rec->nodemap;
2430
2431         /* check which node is the recovery master */
2432         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2433         if (ret != 0) {
2434                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2435                 goto again;
2436         }
2437
2438         if (rec->recmaster == (uint32_t)-1) {
2439                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2440                 force_election(rec, pnn, nodemap);
2441                 goto again;
2442         }
2443         
2444         /* check that we (recovery daemon) and the local ctdb daemon
2445            agrees on whether we are banned or not
2446         */
2447         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2448                 if (rec->banned_nodes[pnn] == NULL) {
2449                         if (rec->recmaster == pnn) {
2450                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2451
2452                                 ctdb_unban_node(rec, pnn);
2453                         } else {
2454                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2455                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2456                                 ctdb_set_culprit(rec, pnn);
2457                         }
2458                         goto again;
2459                 }
2460         } else {
2461                 if (rec->banned_nodes[pnn] != NULL) {
2462                         if (rec->recmaster == pnn) {
2463                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2464
2465                                 ctdb_unban_node(rec, pnn);
2466                         } else {
2467                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2468
2469                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2470                                 ctdb_set_culprit(rec, pnn);
2471                         }
2472                         goto again;
2473                 }
2474         }
2475
2476         /* remember our own node flags */
2477         rec->node_flags = nodemap->nodes[pnn].flags;
2478
2479         /* count how many active nodes there are */
2480         rec->num_active    = 0;
2481         rec->num_connected = 0;
2482         for (i=0; i<nodemap->num; i++) {
2483                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2484                         rec->num_active++;
2485                 }
2486                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2487                         rec->num_connected++;
2488                 }
2489         }
2490
2491
2492         /* verify that the recmaster node is still active */
2493         for (j=0; j<nodemap->num; j++) {
2494                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2495                         break;
2496                 }
2497         }
2498
2499         if (j == nodemap->num) {
2500                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2501                 force_election(rec, pnn, nodemap);
2502                 goto again;
2503         }
2504
2505         /* if recovery master is disconnected we must elect a new recmaster */
2506         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2507                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2508                 force_election(rec, pnn, nodemap);
2509                 goto again;
2510         }
2511
2512         /* grap the nodemap from the recovery master to check if it is banned */
2513         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2514                                    mem_ctx, &recmaster_nodemap);
2515         if (ret != 0) {
2516                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2517                           nodemap->nodes[j].pnn));
2518                 goto again;
2519         }
2520
2521
2522         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2523                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2524                 force_election(rec, pnn, nodemap);
2525                 goto again;
2526         }
2527
2528
2529         /* verify that we have all ip addresses we should have and we dont
2530          * have addresses we shouldnt have.
2531          */ 
2532         if (ctdb->do_checkpublicip) {
2533                 if (verify_ip_allocation(ctdb, pnn) != 0) {
2534                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2535                         goto again;
2536                 }
2537         }
2538
2539
2540         /* if we are not the recmaster then we do not need to check
2541            if recovery is needed
2542          */
2543         if (pnn != rec->recmaster) {
2544                 goto again;
2545         }
2546
2547
2548         /* ensure our local copies of flags are right */
2549         ret = update_local_flags(rec, nodemap);
2550         if (ret == MONITOR_ELECTION_NEEDED) {
2551                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2552                 force_election(rec, pnn, nodemap);
2553                 goto again;
2554         }
2555         if (ret != MONITOR_OK) {
2556                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2557                 goto again;
2558         }
2559
2560         /* update the list of public ips that a node can handle for
2561            all connected nodes
2562         */
2563         if (ctdb->num_nodes != nodemap->num) {
2564                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2565                 reload_nodes_file(ctdb);
2566                 goto again;
2567         }
2568         for (j=0; j<nodemap->num; j++) {
2569                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2570                         continue;
2571                 }
2572                 /* release any existing data */
2573                 if (ctdb->nodes[j]->public_ips) {
2574                         talloc_free(ctdb->nodes[j]->public_ips);
2575                         ctdb->nodes[j]->public_ips = NULL;
2576                 }
2577                 /* grab a new shiny list of public ips from the node */
2578                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2579                         ctdb->nodes[j]->pnn, 
2580                         ctdb->nodes,
2581                         &ctdb->nodes[j]->public_ips)) {
2582                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2583                                 ctdb->nodes[j]->pnn));
2584                         goto again;
2585                 }
2586         }
2587
2588
2589         /* verify that all active nodes agree that we are the recmaster */
2590         switch (verify_recmaster(rec, nodemap, pnn)) {
2591         case MONITOR_RECOVERY_NEEDED:
2592                 /* can not happen */
2593                 goto again;
2594         case MONITOR_ELECTION_NEEDED:
2595                 force_election(rec, pnn, nodemap);
2596                 goto again;
2597         case MONITOR_OK:
2598                 break;
2599         case MONITOR_FAILED:
2600                 goto again;
2601         }
2602
2603
2604         if (rec->need_recovery) {
2605                 /* a previous recovery didn't finish */
2606                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
2607                 goto again;             
2608         }
2609
2610         /* verify that all active nodes are in normal mode 
2611            and not in recovery mode 
2612          */
2613         switch (verify_recmode(ctdb, nodemap)) {
2614         case MONITOR_RECOVERY_NEEDED:
2615                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2616                 goto again;
2617         case MONITOR_FAILED:
2618                 goto again;
2619         case MONITOR_ELECTION_NEEDED:
2620                 /* can not happen */
2621         case MONITOR_OK:
2622                 break;
2623         }
2624
2625
2626         /* we should have the reclock - check its not stale */
2627         if (ctdb->recovery_lock_fd == -1) {
2628                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2629                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2630                 goto again;
2631         }
2632
2633         if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
2634                 DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2635                 close(ctdb->recovery_lock_fd);
2636                 ctdb->recovery_lock_fd = -1;
2637                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2638                 goto again;
2639         }
2640
2641
2642         /* get the nodemap for all active remote nodes
2643          */
2644         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2645         if (remote_nodemaps == NULL) {
2646                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2647                 goto again;
2648         }
2649         for(i=0; i<nodemap->num; i++) {
2650                 remote_nodemaps[i] = NULL;
2651         }
2652         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2653                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2654                 goto again;
2655         } 
2656
2657         /* verify that all other nodes have the same nodemap as we have
2658         */
2659         for (j=0; j<nodemap->num; j++) {
2660                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2661                         continue;
2662                 }
2663
2664                 if (remote_nodemaps[j] == NULL) {
2665                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
2666                         ctdb_set_culprit(rec, j);
2667
2668                         goto again;
2669                 }
2670
2671                 /* if the nodes disagree on how many nodes there are
2672                    then this is a good reason to try recovery
2673                  */
2674                 if (remote_nodemaps[j]->num != nodemap->num) {
2675                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2676                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
2677                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2678                         goto again;
2679                 }
2680
2681                 /* if the nodes disagree on which nodes exist and are
2682                    active, then that is also a good reason to do recovery
2683                  */
2684                 for (i=0;i<nodemap->num;i++) {
2685                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
2686                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2687                                           nodemap->nodes[j].pnn, i, 
2688                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
2689                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2690                                             vnnmap, nodemap->nodes[j].pnn);
2691                                 goto again;
2692                         }
2693                 }
2694
2695                 /* verify the flags are consistent
2696                 */
2697                 for (i=0; i<nodemap->num; i++) {
2698                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2699                                 continue;
2700                         }
2701                         
2702                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
2703                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
2704                                   nodemap->nodes[j].pnn, 
2705                                   nodemap->nodes[i].pnn, 
2706                                   remote_nodemaps[j]->nodes[i].flags,
2707                                   nodemap->nodes[j].flags));
2708                                 if (i == j) {
2709                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
2710                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
2711                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2712                                                     vnnmap, nodemap->nodes[j].pnn);
2713                                         goto again;
2714                                 } else {
2715                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
2716                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
2717                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2718                                                     vnnmap, nodemap->nodes[j].pnn);
2719                                         goto again;
2720                                 }
2721                         }
2722                 }
2723         }
2724
2725
2726         /* there better be the same number of lmasters in the vnn map
2727            as there are active nodes or we will have to do a recovery
2728          */
2729         if (vnnmap->size != rec->num_active) {
2730                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2731                           vnnmap->size, rec->num_active));
2732                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2733                 goto again;
2734         }
2735
2736         /* verify that all active nodes in the nodemap also exist in 
2737            the vnnmap.
2738          */
2739         for (j=0; j<nodemap->num; j++) {
2740                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2741                         continue;
2742                 }
2743                 if (nodemap->nodes[j].pnn == pnn) {
2744                         continue;
2745                 }
2746
2747                 for (i=0; i<vnnmap->size; i++) {
2748                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2749                                 break;
2750                         }
2751                 }
2752                 if (i == vnnmap->size) {
2753                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2754                                   nodemap->nodes[j].pnn));
2755                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2756                         goto again;
2757                 }
2758         }
2759
2760         
2761         /* verify that all other nodes have the same vnnmap
2762            and are from the same generation
2763          */
2764         for (j=0; j<nodemap->num; j++) {
2765                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2766                         continue;
2767                 }
2768                 if (nodemap->nodes[j].pnn == pnn) {
2769                         continue;
2770                 }
2771
2772                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2773                                           mem_ctx, &remote_vnnmap);
2774                 if (ret != 0) {
2775                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2776                                   nodemap->nodes[j].pnn));
2777                         goto again;
2778                 }
2779
2780                 /* verify the vnnmap generation is the same */
2781                 if (vnnmap->generation != remote_vnnmap->generation) {
2782                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2783                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2784                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2785                         goto again;
2786                 }
2787
2788                 /* verify the vnnmap size is the same */
2789                 if (vnnmap->size != remote_vnnmap->size) {
2790                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2791                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2792                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2793                         goto again;
2794                 }
2795
2796                 /* verify the vnnmap is the same */
2797                 for (i=0;i<vnnmap->size;i++) {
2798                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2799                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2800                                           nodemap->nodes[j].pnn));
2801                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2802                                             vnnmap, nodemap->nodes[j].pnn);
2803                                 goto again;
2804                         }
2805                 }
2806         }
2807
2808         /* we might need to change who has what IP assigned */
2809         if (rec->need_takeover_run) {
2810                 rec->need_takeover_run = false;
2811
2812                 /* execute the "startrecovery" event script on all nodes */
2813                 ret = run_startrecovery_eventscript(rec, nodemap);
2814                 if (ret!=0) {
2815                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
2816                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2817                                     vnnmap, ctdb->pnn);
2818                 }
2819
2820                 ret = ctdb_takeover_run(ctdb, nodemap);
2821                 if (ret != 0) {
2822                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2823                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2824                                     vnnmap, ctdb->pnn);
2825                 }
2826
2827                 /* execute the "recovered" event script on all nodes */
2828                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
2829 #if 0
2830 // we cant check whether the event completed successfully
2831 // since this script WILL fail if the node is in recovery mode
2832 // and if that race happens, the code here would just cause a second
2833 // cascading recovery.
2834                 if (ret!=0) {
2835                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
2836                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2837                                     vnnmap, ctdb->pnn);
2838                 }
2839 #endif
2840         }
2841
2842
2843         goto again;
2844
2845 }
2846
2847 /*
2848   event handler for when the main ctdbd dies
2849  */
2850 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2851                                  uint16_t flags, void *private_data)
2852 {
2853         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
2854         _exit(1);
2855 }
2856
2857 /*
2858   called regularly to verify that the recovery daemon is still running
2859  */
2860 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
2861                               struct timeval yt, void *p)
2862 {
2863         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2864
2865         if (kill(ctdb->recoverd_pid, 0) != 0) {
2866                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
2867
2868                 ctdb_stop_recoverd(ctdb);
2869                 ctdb_stop_keepalive(ctdb);
2870                 ctdb_stop_monitoring(ctdb);
2871                 ctdb_release_all_ips(ctdb);
2872                 if (ctdb->methods != NULL) {
2873                         ctdb->methods->shutdown(ctdb);
2874                 }
2875                 ctdb_event_script(ctdb, "shutdown");
2876
2877                 exit(10);       
2878         }
2879
2880         event_add_timed(ctdb->ev, ctdb, 
2881                         timeval_current_ofs(30, 0),
2882                         ctdb_check_recd, ctdb);
2883 }
2884
2885 static void recd_sig_child_handler(struct event_context *ev,
2886         struct signal_event *se, int signum, int count,
2887         void *dont_care, 
2888         void *private_data)
2889 {
2890 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
2891         int status;
2892         pid_t pid = -1;
2893
2894         while (pid != 0) {
2895                 pid = waitpid(-1, &status, WNOHANG);
2896                 if (pid == -1) {
2897                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
2898                         return;
2899                 }
2900                 if (pid > 0) {
2901                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
2902                 }
2903         }
2904 }
2905
2906 /*
2907   startup the recovery daemon as a child of the main ctdb daemon
2908  */
2909 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2910 {
2911         int ret;
2912         int fd[2];
2913         struct signal_event *se;
2914
2915         if (pipe(fd) != 0) {
2916                 return -1;
2917         }
2918
2919         ctdb->ctdbd_pid = getpid();
2920
2921         ctdb->recoverd_pid = fork();
2922         if (ctdb->recoverd_pid == -1) {
2923                 return -1;
2924         }
2925         
2926         if (ctdb->recoverd_pid != 0) {
2927                 close(fd[0]);
2928                 event_add_timed(ctdb->ev, ctdb, 
2929                                 timeval_current_ofs(30, 0),
2930                                 ctdb_check_recd, ctdb);
2931                 return 0;
2932         }
2933
2934         close(fd[1]);
2935
2936         /* shutdown the transport */
2937         if (ctdb->methods) {
2938                 ctdb->methods->shutdown(ctdb);
2939         }
2940
2941         /* get a new event context */
2942         talloc_free(ctdb->ev);
2943         ctdb->ev = event_context_init(ctdb);
2944
2945         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
2946                      ctdb_recoverd_parent, &fd[0]);     
2947
2948         close(ctdb->daemon.sd);
2949         ctdb->daemon.sd = -1;
2950
2951         srandom(getpid() ^ time(NULL));
2952
2953         /* the recovery daemon does not need to be realtime */
2954         if (ctdb->do_setsched) {
2955                 ctdb_restore_scheduler(ctdb);
2956         }
2957
2958         /* initialise ctdb */
2959         ret = ctdb_socket_connect(ctdb);
2960         if (ret != 0) {
2961                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
2962                 exit(1);
2963         }
2964
2965         /* set up a handler to pick up sigchld */
2966         se = event_add_signal(ctdb->ev, ctdb,
2967                                      SIGCHLD, 0,
2968                                      recd_sig_child_handler,
2969                                      ctdb);
2970         if (se == NULL) {
2971                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
2972                 exit(1);
2973         }
2974
2975         monitor_cluster(ctdb);
2976
2977         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
2978         return -1;
2979 }
2980
2981 /*
2982   shutdown the recovery daemon
2983  */
2984 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
2985 {
2986         if (ctdb->recoverd_pid == 0) {
2987                 return;
2988         }
2989
2990         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
2991         kill(ctdb->recoverd_pid, SIGTERM);
2992 }