From Michael Adams,
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 struct ban_state {
35         struct ctdb_recoverd *rec;
36         uint32_t banned_node;
37 };
38
39 /*
40   private state of recovery daemon
41  */
42 struct ctdb_recoverd {
43         struct ctdb_context *ctdb;
44         int rec_file_fd;
45         uint32_t recmaster;
46         uint32_t num_active;
47         uint32_t num_connected;
48         struct ctdb_node_map *nodemap;
49         uint32_t last_culprit;
50         uint32_t culprit_counter;
51         struct timeval first_recover_time;
52         struct ban_state **banned_nodes;
53         struct timeval priority_time;
54         bool need_takeover_run;
55         bool need_recovery;
56         uint32_t node_flags;
57         struct timed_event *send_election_te;
58         struct timed_event *election_timeout;
59         struct vacuum_info *vacuum_info;
60 };
61
62 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
63 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
64
65
66 /*
67   unban a node
68  */
69 static void ctdb_unban_node(struct ctdb_recoverd *rec, uint32_t pnn)
70 {
71         struct ctdb_context *ctdb = rec->ctdb;
72
73         DEBUG(DEBUG_NOTICE,("Unbanning node %u\n", pnn));
74
75         if (!ctdb_validate_pnn(ctdb, pnn)) {
76                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_unban_node\n", pnn));
77                 return;
78         }
79
80         /* If we are unbanning a different node then just pass the ban info on */
81         if (pnn != ctdb->pnn) {
82                 TDB_DATA data;
83                 int ret;
84                 
85                 DEBUG(DEBUG_NOTICE,("Unanning remote node %u. Passing the ban request on to the remote node.\n", pnn));
86
87                 data.dptr = (uint8_t *)&pnn;
88                 data.dsize = sizeof(uint32_t);
89
90                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_UNBAN_NODE, data);
91                 if (ret != 0) {
92                         DEBUG(DEBUG_ERR,("Failed to unban node %u\n", pnn));
93                         return;
94                 }
95
96                 return;
97         }
98
99         /* make sure we remember we are no longer banned in case 
100            there is an election */
101         rec->node_flags &= ~NODE_FLAGS_BANNED;
102
103         DEBUG(DEBUG_INFO,("Clearing ban flag on node %u\n", pnn));
104         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, 0, NODE_FLAGS_BANNED);
105
106         if (rec->banned_nodes[pnn] == NULL) {
107                 DEBUG(DEBUG_INFO,("No ban recorded for this node. ctdb_unban_node() request ignored\n"));
108                 return;
109         }
110
111         talloc_free(rec->banned_nodes[pnn]);
112         rec->banned_nodes[pnn] = NULL;
113 }
114
115
116 /*
117   called when a ban has timed out
118  */
119 static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
120 {
121         struct ban_state *state = talloc_get_type(p, struct ban_state);
122         struct ctdb_recoverd *rec = state->rec;
123         uint32_t pnn = state->banned_node;
124
125         DEBUG(DEBUG_NOTICE,("Ban timeout. Node %u is now unbanned\n", pnn));
126         ctdb_unban_node(rec, pnn);
127 }
128
129 /*
130   ban a node for a period of time
131  */
132 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
133 {
134         struct ctdb_context *ctdb = rec->ctdb;
135
136         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
137
138         if (!ctdb_validate_pnn(ctdb, pnn)) {
139                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
140                 return;
141         }
142
143         if (0 == ctdb->tunable.enable_bans) {
144                 DEBUG(DEBUG_INFO,("Bans are disabled - ignoring ban of node %u\n", pnn));
145                 return;
146         }
147
148         /* If we are banning a different node then just pass the ban info on */
149         if (pnn != ctdb->pnn) {
150                 struct ctdb_ban_info b;
151                 TDB_DATA data;
152                 int ret;
153                 
154                 DEBUG(DEBUG_NOTICE,("Banning remote node %u for %u seconds. Passing the ban request on to the remote node.\n", pnn, ban_time));
155
156                 b.pnn = pnn;
157                 b.ban_time = ban_time;
158
159                 data.dptr = (uint8_t *)&b;
160                 data.dsize = sizeof(b);
161
162                 ret = ctdb_send_message(ctdb, pnn, CTDB_SRVID_BAN_NODE, data);
163                 if (ret != 0) {
164                         DEBUG(DEBUG_ERR,("Failed to ban node %u\n", pnn));
165                         return;
166                 }
167
168                 return;
169         }
170
171         DEBUG(DEBUG_NOTICE,("self ban - lowering our election priority\n"));
172         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, NODE_FLAGS_BANNED, 0);
173
174         /* banning ourselves - lower our election priority */
175         rec->priority_time = timeval_current();
176
177         /* make sure we remember we are banned in case there is an 
178            election */
179         rec->node_flags |= NODE_FLAGS_BANNED;
180
181         if (rec->banned_nodes[pnn] != NULL) {
182                 DEBUG(DEBUG_NOTICE,("Re-banning an already banned node. Remove previous ban and set a new ban.\n"));            
183                 talloc_free(rec->banned_nodes[pnn]);
184                 rec->banned_nodes[pnn] = NULL;
185         }
186
187         rec->banned_nodes[pnn] = talloc(rec->banned_nodes, struct ban_state);
188         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes[pnn]);
189
190         rec->banned_nodes[pnn]->rec = rec;
191         rec->banned_nodes[pnn]->banned_node = pnn;
192
193         if (ban_time != 0) {
194                 event_add_timed(ctdb->ev, rec->banned_nodes[pnn], 
195                                 timeval_current_ofs(ban_time, 0),
196                                 ctdb_ban_timeout, rec->banned_nodes[pnn]);
197         }
198 }
199
200 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
201
202
203 /*
204   run the "recovered" eventscript on all nodes
205  */
206 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
207 {
208         TALLOC_CTX *tmp_ctx;
209         uint32_t *nodes;
210
211         tmp_ctx = talloc_new(ctdb);
212         CTDB_NO_MEMORY(ctdb, tmp_ctx);
213
214         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
215         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
216                                         nodes,
217                                         CONTROL_TIMEOUT(), false, tdb_null,
218                                         NULL, NULL,
219                                         NULL) != 0) {
220                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
221
222                 talloc_free(tmp_ctx);
223                 return -1;
224         }
225
226         talloc_free(tmp_ctx);
227         return 0;
228 }
229
230 /*
231   remember the trouble maker
232  */
233 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
234 {
235         struct ctdb_context *ctdb = rec->ctdb;
236
237         if (rec->last_culprit != culprit ||
238             timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
239                 DEBUG(DEBUG_NOTICE,("New recovery culprit %u\n", culprit));
240                 /* either a new node is the culprit, or we've decided to forgive them */
241                 rec->last_culprit = culprit;
242                 rec->first_recover_time = timeval_current();
243                 rec->culprit_counter = 0;
244         }
245         rec->culprit_counter++;
246 }
247
248
249 /* this callback is called for every node that failed to execute the
250    start recovery event
251 */
252 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
253 {
254         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
255
256         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
257
258         ctdb_set_culprit(rec, node_pnn);
259 }
260
261 /*
262   run the "startrecovery" eventscript on all nodes
263  */
264 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
265 {
266         TALLOC_CTX *tmp_ctx;
267         uint32_t *nodes;
268         struct ctdb_context *ctdb = rec->ctdb;
269
270         tmp_ctx = talloc_new(ctdb);
271         CTDB_NO_MEMORY(ctdb, tmp_ctx);
272
273         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
274         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
275                                         nodes,
276                                         CONTROL_TIMEOUT(), false, tdb_null,
277                                         NULL,
278                                         startrecovery_fail_callback,
279                                         rec) != 0) {
280                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
281                 talloc_free(tmp_ctx);
282                 return -1;
283         }
284
285         talloc_free(tmp_ctx);
286         return 0;
287 }
288
289 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
290 {
291         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
292                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
293                 return;
294         }
295         ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
296 }
297
298 /*
299   update the node capabilities for all connected nodes
300  */
301 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
302 {
303         uint32_t *nodes;
304         TALLOC_CTX *tmp_ctx;
305
306         tmp_ctx = talloc_new(ctdb);
307         CTDB_NO_MEMORY(ctdb, tmp_ctx);
308
309         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
310         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
311                                         nodes, CONTROL_TIMEOUT(),
312                                         false, tdb_null,
313                                         async_getcap_callback, NULL,
314                                         NULL) != 0) {
315                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
316                 talloc_free(tmp_ctx);
317                 return -1;
318         }
319
320         talloc_free(tmp_ctx);
321         return 0;
322 }
323
324 /*
325   change recovery mode on all nodes
326  */
327 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
328 {
329         TDB_DATA data;
330         uint32_t *nodes;
331         TALLOC_CTX *tmp_ctx;
332
333         tmp_ctx = talloc_new(ctdb);
334         CTDB_NO_MEMORY(ctdb, tmp_ctx);
335
336         /* freeze all nodes */
337         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
338         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
339                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
340                                                 nodes, CONTROL_TIMEOUT(),
341                                                 false, tdb_null,
342                                                 NULL, NULL,
343                                                 NULL) != 0) {
344                         DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
345                         talloc_free(tmp_ctx);
346                         return -1;
347                 }
348         }
349
350
351         data.dsize = sizeof(uint32_t);
352         data.dptr = (unsigned char *)&rec_mode;
353
354         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
355                                         nodes, CONTROL_TIMEOUT(),
356                                         false, data,
357                                         NULL, NULL,
358                                         NULL) != 0) {
359                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
360                 talloc_free(tmp_ctx);
361                 return -1;
362         }
363
364         talloc_free(tmp_ctx);
365         return 0;
366 }
367
368 /*
369   change recovery master on all node
370  */
371 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
372 {
373         TDB_DATA data;
374         TALLOC_CTX *tmp_ctx;
375         uint32_t *nodes;
376
377         tmp_ctx = talloc_new(ctdb);
378         CTDB_NO_MEMORY(ctdb, tmp_ctx);
379
380         data.dsize = sizeof(uint32_t);
381         data.dptr = (unsigned char *)&pnn;
382
383         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
384         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
385                                         nodes,
386                                         CONTROL_TIMEOUT(), false, data,
387                                         NULL, NULL,
388                                         NULL) != 0) {
389                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
390                 talloc_free(tmp_ctx);
391                 return -1;
392         }
393
394         talloc_free(tmp_ctx);
395         return 0;
396 }
397
398
399 /*
400   ensure all other nodes have attached to any databases that we have
401  */
402 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
403                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
404 {
405         int i, j, db, ret;
406         struct ctdb_dbid_map *remote_dbmap;
407
408         /* verify that all other nodes have all our databases */
409         for (j=0; j<nodemap->num; j++) {
410                 /* we dont need to ourself ourselves */
411                 if (nodemap->nodes[j].pnn == pnn) {
412                         continue;
413                 }
414                 /* dont check nodes that are unavailable */
415                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
416                         continue;
417                 }
418
419                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
420                                          mem_ctx, &remote_dbmap);
421                 if (ret != 0) {
422                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
423                         return -1;
424                 }
425
426                 /* step through all local databases */
427                 for (db=0; db<dbmap->num;db++) {
428                         const char *name;
429
430
431                         for (i=0;i<remote_dbmap->num;i++) {
432                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
433                                         break;
434                                 }
435                         }
436                         /* the remote node already have this database */
437                         if (i!=remote_dbmap->num) {
438                                 continue;
439                         }
440                         /* ok so we need to create this database */
441                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
442                                             mem_ctx, &name);
443                         if (ret != 0) {
444                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
445                                 return -1;
446                         }
447                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
448                                            mem_ctx, name, dbmap->dbs[db].persistent);
449                         if (ret != 0) {
450                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
451                                 return -1;
452                         }
453                 }
454         }
455
456         return 0;
457 }
458
459
460 /*
461   ensure we are attached to any databases that anyone else is attached to
462  */
463 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
464                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
465 {
466         int i, j, db, ret;
467         struct ctdb_dbid_map *remote_dbmap;
468
469         /* verify that we have all database any other node has */
470         for (j=0; j<nodemap->num; j++) {
471                 /* we dont need to ourself ourselves */
472                 if (nodemap->nodes[j].pnn == pnn) {
473                         continue;
474                 }
475                 /* dont check nodes that are unavailable */
476                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
477                         continue;
478                 }
479
480                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
481                                          mem_ctx, &remote_dbmap);
482                 if (ret != 0) {
483                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
484                         return -1;
485                 }
486
487                 /* step through all databases on the remote node */
488                 for (db=0; db<remote_dbmap->num;db++) {
489                         const char *name;
490
491                         for (i=0;i<(*dbmap)->num;i++) {
492                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
493                                         break;
494                                 }
495                         }
496                         /* we already have this db locally */
497                         if (i!=(*dbmap)->num) {
498                                 continue;
499                         }
500                         /* ok so we need to create this database and
501                            rebuild dbmap
502                          */
503                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
504                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
507                                           nodemap->nodes[j].pnn));
508                                 return -1;
509                         }
510                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
511                                            remote_dbmap->dbs[db].persistent);
512                         if (ret != 0) {
513                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
514                                 return -1;
515                         }
516                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
517                         if (ret != 0) {
518                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
519                                 return -1;
520                         }
521                 }
522         }
523
524         return 0;
525 }
526
527
528 /*
529   pull the remote database contents from one node into the recdb
530  */
531 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
532                                     struct tdb_wrap *recdb, uint32_t dbid)
533 {
534         int ret;
535         TDB_DATA outdata;
536         struct ctdb_control_pulldb_reply *reply;
537         struct ctdb_rec_data *rec;
538         int i;
539         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
540
541         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
542                                CONTROL_TIMEOUT(), &outdata);
543         if (ret != 0) {
544                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
545                 talloc_free(tmp_ctx);
546                 return -1;
547         }
548
549         reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
550
551         if (outdata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
552                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
553                 talloc_free(tmp_ctx);
554                 return -1;
555         }
556         
557         rec = (struct ctdb_rec_data *)&reply->data[0];
558         
559         for (i=0;
560              i<reply->count;
561              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
562                 TDB_DATA key, data;
563                 struct ctdb_ltdb_header *hdr;
564                 TDB_DATA existing;
565                 
566                 key.dptr = &rec->data[0];
567                 key.dsize = rec->keylen;
568                 data.dptr = &rec->data[key.dsize];
569                 data.dsize = rec->datalen;
570                 
571                 hdr = (struct ctdb_ltdb_header *)data.dptr;
572
573                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
574                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
575                         talloc_free(tmp_ctx);
576                         return -1;
577                 }
578
579                 /* fetch the existing record, if any */
580                 existing = tdb_fetch(recdb->tdb, key);
581                 
582                 if (existing.dptr != NULL) {
583                         struct ctdb_ltdb_header header;
584                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
585                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
586                                          (unsigned)existing.dsize, srcnode));
587                                 free(existing.dptr);
588                                 talloc_free(tmp_ctx);
589                                 return -1;
590                         }
591                         header = *(struct ctdb_ltdb_header *)existing.dptr;
592                         free(existing.dptr);
593                         if (!(header.rsn < hdr->rsn ||
594                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
595                                 continue;
596                         }
597                 }
598                 
599                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
600                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
601                         talloc_free(tmp_ctx);
602                         return -1;                              
603                 }
604         }
605
606         talloc_free(tmp_ctx);
607
608         return 0;
609 }
610
611 /*
612   pull all the remote database contents into the recdb
613  */
614 static int pull_remote_database(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
615                                 struct tdb_wrap *recdb, uint32_t dbid)
616 {
617         int j;
618
619         /* pull all records from all other nodes across onto this node
620            (this merges based on rsn)
621         */
622         for (j=0; j<nodemap->num; j++) {
623                 /* dont merge from nodes that are unavailable */
624                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
625                         continue;
626                 }
627                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
628                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
629                                  nodemap->nodes[j].pnn));
630                         return -1;
631                 }
632         }
633         
634         return 0;
635 }
636
637
638 /*
639   update flags on all active nodes
640  */
641 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
642 {
643         int i;
644         for (i=0;i<nodemap->num;i++) {
645                 struct ctdb_node_flag_change c;
646                 TDB_DATA data;
647
648                 c.pnn = nodemap->nodes[i].pnn;
649                 c.old_flags = nodemap->nodes[i].flags;
650                 c.new_flags = nodemap->nodes[i].flags;
651
652                 data.dptr = (uint8_t *)&c;
653                 data.dsize = sizeof(c);
654
655                 ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
656                                   CTDB_SRVID_NODE_FLAGS_CHANGED, data);
657
658         }
659         return 0;
660 }
661
662 static int update_our_flags_on_all_nodes(struct ctdb_context *ctdb, uint32_t pnn, struct ctdb_node_map *nodemap)
663 {
664         struct ctdb_node_flag_change c;
665         TDB_DATA data;
666
667         c.pnn = nodemap->nodes[pnn].pnn;
668         c.old_flags = nodemap->nodes[pnn].flags;
669         c.new_flags = nodemap->nodes[pnn].flags;
670
671         data.dptr = (uint8_t *)&c;
672         data.dsize = sizeof(c);
673
674         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
675                           CTDB_SRVID_NODE_FLAGS_CHANGED, data);
676
677         return 0;
678 }
679
680 /*
681   ensure all nodes have the same vnnmap we do
682  */
683 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
684                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
685 {
686         int j, ret;
687
688         /* push the new vnn map out to all the nodes */
689         for (j=0; j<nodemap->num; j++) {
690                 /* dont push to nodes that are unavailable */
691                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
692                         continue;
693                 }
694
695                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
696                 if (ret != 0) {
697                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
698                         return -1;
699                 }
700         }
701
702         return 0;
703 }
704
705
706 /*
707   handler for when the admin bans a node
708 */
709 static void ban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
710                         TDB_DATA data, void *private_data)
711 {
712         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
713         struct ctdb_ban_info *b = (struct ctdb_ban_info *)data.dptr;
714         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
715
716         if (data.dsize != sizeof(*b)) {
717                 DEBUG(DEBUG_ERR,("Bad data in ban_handler\n"));
718                 talloc_free(mem_ctx);
719                 return;
720         }
721
722         if (b->pnn != ctdb->pnn) {
723                 DEBUG(DEBUG_ERR,("Got a ban request for pnn:%u but our pnn is %u. Ignoring ban request\n", b->pnn, ctdb->pnn));
724                 return;
725         }
726
727         DEBUG(DEBUG_NOTICE,("Node %u has been banned for %u seconds\n", 
728                  b->pnn, b->ban_time));
729
730         ctdb_ban_node(rec, b->pnn, b->ban_time);
731         talloc_free(mem_ctx);
732 }
733
734 /*
735   handler for when the admin unbans a node
736 */
737 static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid, 
738                           TDB_DATA data, void *private_data)
739 {
740         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
741         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
742         uint32_t pnn;
743
744         if (data.dsize != sizeof(uint32_t)) {
745                 DEBUG(DEBUG_ERR,("Bad data in unban_handler\n"));
746                 talloc_free(mem_ctx);
747                 return;
748         }
749         pnn = *(uint32_t *)data.dptr;
750
751         if (pnn != ctdb->pnn) {
752                 DEBUG(DEBUG_ERR,("Got an unban request for pnn:%u but our pnn is %u. Ignoring unban request\n", pnn, ctdb->pnn));
753                 return;
754         }
755
756         DEBUG(DEBUG_NOTICE,("Node %u has been unbanned.\n", pnn));
757         ctdb_unban_node(rec, pnn);
758         talloc_free(mem_ctx);
759 }
760
761
762 struct vacuum_info {
763         struct vacuum_info *next, *prev;
764         struct ctdb_recoverd *rec;
765         uint32_t srcnode;
766         struct ctdb_db_context *ctdb_db;
767         struct ctdb_control_pulldb_reply *recs;
768         struct ctdb_rec_data *r;
769 };
770
771 static void vacuum_fetch_next(struct vacuum_info *v);
772
773 /*
774   called when a vacuum fetch has completed - just free it and do the next one
775  */
776 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
777 {
778         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
779         talloc_free(state);
780         vacuum_fetch_next(v);
781 }
782
783
784 /*
785   process the next element from the vacuum list
786 */
787 static void vacuum_fetch_next(struct vacuum_info *v)
788 {
789         struct ctdb_call call;
790         struct ctdb_rec_data *r;
791
792         while (v->recs->count) {
793                 struct ctdb_client_call_state *state;
794                 TDB_DATA data;
795                 struct ctdb_ltdb_header *hdr;
796
797                 ZERO_STRUCT(call);
798                 call.call_id = CTDB_NULL_FUNC;
799                 call.flags = CTDB_IMMEDIATE_MIGRATION;
800
801                 r = v->r;
802                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
803                 v->recs->count--;
804
805                 call.key.dptr = &r->data[0];
806                 call.key.dsize = r->keylen;
807
808                 /* ensure we don't block this daemon - just skip a record if we can't get
809                    the chainlock */
810                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
811                         continue;
812                 }
813
814                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
815                 if (data.dptr == NULL) {
816                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
817                         continue;
818                 }
819
820                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
821                         free(data.dptr);
822                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
823                         continue;
824                 }
825                 
826                 hdr = (struct ctdb_ltdb_header *)data.dptr;
827                 if (hdr->dmaster == v->rec->ctdb->pnn) {
828                         /* its already local */
829                         free(data.dptr);
830                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
831                         continue;
832                 }
833
834                 free(data.dptr);
835
836                 state = ctdb_call_send(v->ctdb_db, &call);
837                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
838                 if (state == NULL) {
839                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
840                         talloc_free(v);
841                         return;
842                 }
843                 state->async.fn = vacuum_fetch_callback;
844                 state->async.private_data = v;
845                 return;
846         }
847
848         talloc_free(v);
849 }
850
851
852 /*
853   destroy a vacuum info structure
854  */
855 static int vacuum_info_destructor(struct vacuum_info *v)
856 {
857         DLIST_REMOVE(v->rec->vacuum_info, v);
858         return 0;
859 }
860
861
862 /*
863   handler for vacuum fetch
864 */
865 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
866                                  TDB_DATA data, void *private_data)
867 {
868         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
869         struct ctdb_control_pulldb_reply *recs;
870         int ret, i;
871         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
872         const char *name;
873         struct ctdb_dbid_map *dbmap=NULL;
874         bool persistent = false;
875         struct ctdb_db_context *ctdb_db;
876         struct ctdb_rec_data *r;
877         uint32_t srcnode;
878         struct vacuum_info *v;
879
880         recs = (struct ctdb_control_pulldb_reply *)data.dptr;
881         r = (struct ctdb_rec_data *)&recs->data[0];
882
883         if (recs->count == 0) {
884                 return;
885         }
886
887         srcnode = r->reqid;
888
889         for (v=rec->vacuum_info;v;v=v->next) {
890                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
891                         /* we're already working on records from this node */
892                         return;
893                 }
894         }
895
896         /* work out if the database is persistent */
897         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
898         if (ret != 0) {
899                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
900                 talloc_free(tmp_ctx);
901                 return;
902         }
903
904         for (i=0;i<dbmap->num;i++) {
905                 if (dbmap->dbs[i].dbid == recs->db_id) {
906                         persistent = dbmap->dbs[i].persistent;
907                         break;
908                 }
909         }
910         if (i == dbmap->num) {
911                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
912                 talloc_free(tmp_ctx);
913                 return;         
914         }
915
916         /* find the name of this database */
917         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
918                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
919                 talloc_free(tmp_ctx);
920                 return;
921         }
922
923         /* attach to it */
924         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
925         if (ctdb_db == NULL) {
926                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
927                 talloc_free(tmp_ctx);
928                 return;
929         }
930
931         v = talloc_zero(rec, struct vacuum_info);
932         if (v == NULL) {
933                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
934                 return;
935         }
936
937         v->rec = rec;
938         v->srcnode = srcnode;
939         v->ctdb_db = ctdb_db;
940         v->recs = talloc_memdup(v, recs, data.dsize);
941         if (v->recs == NULL) {
942                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
943                 talloc_free(v);
944                 return;         
945         }
946         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
947
948         DLIST_ADD(rec->vacuum_info, v);
949
950         talloc_set_destructor(v, vacuum_info_destructor);
951
952         vacuum_fetch_next(v);
953 }
954
955
956 /*
957   called when ctdb_wait_timeout should finish
958  */
959 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
960                               struct timeval yt, void *p)
961 {
962         uint32_t *timed_out = (uint32_t *)p;
963         (*timed_out) = 1;
964 }
965
966 /*
967   wait for a given number of seconds
968  */
969 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
970 {
971         uint32_t timed_out = 0;
972         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
973         while (!timed_out) {
974                 event_loop_once(ctdb->ev);
975         }
976 }
977
978 /*
979   called when an election times out (ends)
980  */
981 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
982                                   struct timeval t, void *p)
983 {
984         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
985         rec->election_timeout = NULL;
986 }
987
988
989 /*
990   wait for an election to finish. It finished election_timeout seconds after
991   the last election packet is received
992  */
993 static void ctdb_wait_election(struct ctdb_recoverd *rec)
994 {
995         struct ctdb_context *ctdb = rec->ctdb;
996         while (rec->election_timeout) {
997                 event_loop_once(ctdb->ev);
998         }
999 }
1000
1001 /*
1002   Update our local flags from all remote connected nodes. 
1003   This is only run when we are or we belive we are the recovery master
1004  */
1005 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1006 {
1007         int j;
1008         struct ctdb_context *ctdb = rec->ctdb;
1009         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1010
1011         /* get the nodemap for all active remote nodes and verify
1012            they are the same as for this node
1013          */
1014         for (j=0; j<nodemap->num; j++) {
1015                 struct ctdb_node_map *remote_nodemap=NULL;
1016                 int ret;
1017
1018                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1019                         continue;
1020                 }
1021                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1022                         continue;
1023                 }
1024
1025                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1026                                            mem_ctx, &remote_nodemap);
1027                 if (ret != 0) {
1028                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1029                                   nodemap->nodes[j].pnn));
1030                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1031                         talloc_free(mem_ctx);
1032                         return MONITOR_FAILED;
1033                 }
1034                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1035                         struct ctdb_node_flag_change c;
1036                         TDB_DATA data;
1037
1038                         /* We should tell our daemon about this so it
1039                            updates its flags or else we will log the same 
1040                            message again in the next iteration of recovery.
1041                            Since we are the recovery master we can just as
1042                            well update the flags on all nodes.
1043                         */
1044                         c.pnn = nodemap->nodes[j].pnn;
1045                         c.old_flags = nodemap->nodes[j].flags;
1046                         c.new_flags = remote_nodemap->nodes[j].flags;
1047
1048                         data.dptr = (uint8_t *)&c;
1049                         data.dsize = sizeof(c);
1050
1051                         ctdb_send_message(ctdb, ctdb->pnn,
1052                                         CTDB_SRVID_NODE_FLAGS_CHANGED, 
1053                                         data);
1054
1055                         /* Update our local copy of the flags in the recovery
1056                            daemon.
1057                         */
1058                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1059                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1060                                  nodemap->nodes[j].flags));
1061                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1062
1063                         /* If the BANNED flag has changed for the node
1064                            this is a good reason to do a new election.
1065                          */
1066                         if ((c.old_flags ^ c.new_flags) & NODE_FLAGS_BANNED) {
1067                                 DEBUG(DEBUG_NOTICE,("Remote node %u had different BANNED flags 0x%x, local had 0x%x - trigger a re-election\n",
1068                                  nodemap->nodes[j].pnn, c.new_flags,
1069                                  c.old_flags));
1070                                 talloc_free(mem_ctx);
1071                                 return MONITOR_ELECTION_NEEDED;
1072                         }
1073
1074                 }
1075                 talloc_free(remote_nodemap);
1076         }
1077         talloc_free(mem_ctx);
1078         return MONITOR_OK;
1079 }
1080
1081
1082 /* Create a new random generation ip. 
1083    The generation id can not be the INVALID_GENERATION id
1084 */
1085 static uint32_t new_generation(void)
1086 {
1087         uint32_t generation;
1088
1089         while (1) {
1090                 generation = random();
1091
1092                 if (generation != INVALID_GENERATION) {
1093                         break;
1094                 }
1095         }
1096
1097         return generation;
1098 }
1099
1100
1101 /*
1102   create a temporary working database
1103  */
1104 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1105 {
1106         char *name;
1107         struct tdb_wrap *recdb;
1108         unsigned tdb_flags;
1109
1110         /* open up the temporary recovery database */
1111         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1112         if (name == NULL) {
1113                 return NULL;
1114         }
1115         unlink(name);
1116
1117         tdb_flags = TDB_NOLOCK;
1118         if (!ctdb->do_setsched) {
1119                 tdb_flags |= TDB_NOMMAP;
1120         }
1121
1122         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1123                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1124         if (recdb == NULL) {
1125                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1126         }
1127
1128         talloc_free(name);
1129
1130         return recdb;
1131 }
1132
1133
1134 /* 
1135    a traverse function for pulling all relevent records from recdb
1136  */
1137 struct recdb_data {
1138         struct ctdb_context *ctdb;
1139         struct ctdb_control_pulldb_reply *recdata;
1140         uint32_t len;
1141         bool failed;
1142 };
1143
1144 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1145 {
1146         struct recdb_data *params = (struct recdb_data *)p;
1147         struct ctdb_rec_data *rec;
1148         struct ctdb_ltdb_header *hdr;
1149
1150         /* skip empty records */
1151         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1152                 return 0;
1153         }
1154
1155         /* update the dmaster field to point to us */
1156         hdr = (struct ctdb_ltdb_header *)data.dptr;
1157         hdr->dmaster = params->ctdb->pnn;
1158
1159         /* add the record to the blob ready to send to the nodes */
1160         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1161         if (rec == NULL) {
1162                 params->failed = true;
1163                 return -1;
1164         }
1165         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1166         if (params->recdata == NULL) {
1167                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1168                          rec->length + params->len, params->recdata->count));
1169                 params->failed = true;
1170                 return -1;
1171         }
1172         params->recdata->count++;
1173         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1174         params->len += rec->length;
1175         talloc_free(rec);
1176
1177         return 0;
1178 }
1179
1180 /*
1181   push the recdb database out to all nodes
1182  */
1183 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1184                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1185 {
1186         struct recdb_data params;
1187         struct ctdb_control_pulldb_reply *recdata;
1188         TDB_DATA outdata;
1189         TALLOC_CTX *tmp_ctx;
1190         uint32_t *nodes;
1191
1192         tmp_ctx = talloc_new(ctdb);
1193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1194
1195         recdata = talloc_zero(recdb, struct ctdb_control_pulldb_reply);
1196         CTDB_NO_MEMORY(ctdb, recdata);
1197
1198         recdata->db_id = dbid;
1199
1200         params.ctdb = ctdb;
1201         params.recdata = recdata;
1202         params.len = offsetof(struct ctdb_control_pulldb_reply, data);
1203         params.failed = false;
1204
1205         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1206                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1207                 talloc_free(params.recdata);
1208                 talloc_free(tmp_ctx);
1209                 return -1;
1210         }
1211
1212         if (params.failed) {
1213                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1214                 talloc_free(params.recdata);
1215                 talloc_free(tmp_ctx);
1216                 return -1;              
1217         }
1218
1219         recdata = params.recdata;
1220
1221         outdata.dptr = (void *)recdata;
1222         outdata.dsize = params.len;
1223
1224         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1225         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1226                                         nodes,
1227                                         CONTROL_TIMEOUT(), false, outdata,
1228                                         NULL, NULL,
1229                                         NULL) != 0) {
1230                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1231                 talloc_free(recdata);
1232                 talloc_free(tmp_ctx);
1233                 return -1;
1234         }
1235
1236         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1237                   dbid, recdata->count));
1238
1239         talloc_free(recdata);
1240         talloc_free(tmp_ctx);
1241
1242         return 0;
1243 }
1244
1245
1246 /*
1247   go through a full recovery on one database 
1248  */
1249 static int recover_database(struct ctdb_recoverd *rec, 
1250                             TALLOC_CTX *mem_ctx,
1251                             uint32_t dbid,
1252                             uint32_t pnn, 
1253                             struct ctdb_node_map *nodemap,
1254                             uint32_t transaction_id)
1255 {
1256         struct tdb_wrap *recdb;
1257         int ret;
1258         struct ctdb_context *ctdb = rec->ctdb;
1259         TDB_DATA data;
1260         struct ctdb_control_wipe_database w;
1261         uint32_t *nodes;
1262
1263         recdb = create_recdb(ctdb, mem_ctx);
1264         if (recdb == NULL) {
1265                 return -1;
1266         }
1267
1268         /* pull all remote databases onto the recdb */
1269         ret = pull_remote_database(ctdb, nodemap, recdb, dbid);
1270         if (ret != 0) {
1271                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1272                 return -1;
1273         }
1274
1275         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1276
1277         /* wipe all the remote databases. This is safe as we are in a transaction */
1278         w.db_id = dbid;
1279         w.transaction_id = transaction_id;
1280
1281         data.dptr = (void *)&w;
1282         data.dsize = sizeof(w);
1283
1284         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1285         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1286                                         nodes,
1287                                         CONTROL_TIMEOUT(), false, data,
1288                                         NULL, NULL,
1289                                         NULL) != 0) {
1290                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1291                 talloc_free(recdb);
1292                 return -1;
1293         }
1294         
1295         /* push out the correct database. This sets the dmaster and skips 
1296            the empty records */
1297         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1298         if (ret != 0) {
1299                 talloc_free(recdb);
1300                 return -1;
1301         }
1302
1303         /* all done with this database */
1304         talloc_free(recdb);
1305
1306         return 0;
1307 }
1308
1309                 
1310 /*
1311   we are the recmaster, and recovery is needed - start a recovery run
1312  */
1313 static int do_recovery(struct ctdb_recoverd *rec, 
1314                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1315                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
1316                        int32_t culprit)
1317 {
1318         struct ctdb_context *ctdb = rec->ctdb;
1319         int i, j, ret;
1320         uint32_t generation;
1321         struct ctdb_dbid_map *dbmap;
1322         TDB_DATA data;
1323         uint32_t *nodes;
1324
1325         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1326
1327         /* if recovery fails, force it again */
1328         rec->need_recovery = true;
1329
1330         if (culprit != -1) {
1331                 ctdb_set_culprit(rec, culprit);
1332         }
1333
1334         if (rec->culprit_counter > 2*nodemap->num) {
1335                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
1336                          culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
1337                          ctdb->tunable.recovery_ban_period));
1338                 ctdb_ban_node(rec, culprit, ctdb->tunable.recovery_ban_period);
1339         }
1340
1341         if (!ctdb_recovery_lock(ctdb, true)) {
1342                 ctdb_set_culprit(rec, pnn);
1343                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1344                 return -1;
1345         }
1346
1347         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
1348
1349         /* get a list of all databases */
1350         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1351         if (ret != 0) {
1352                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1353                 return -1;
1354         }
1355
1356         /* we do the db creation before we set the recovery mode, so the freeze happens
1357            on all databases we will be dealing with. */
1358
1359         /* verify that we have all the databases any other node has */
1360         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1361         if (ret != 0) {
1362                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1363                 return -1;
1364         }
1365
1366         /* verify that all other nodes have all our databases */
1367         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1368         if (ret != 0) {
1369                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1370                 return -1;
1371         }
1372
1373         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1374
1375
1376         /* set recovery mode to active on all nodes */
1377         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1380                 return -1;
1381         }
1382
1383         /* execute the "startrecovery" event script on all nodes */
1384         ret = run_startrecovery_eventscript(rec, nodemap);
1385         if (ret!=0) {
1386                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1387                 return -1;
1388         }
1389
1390         /* pick a new generation number */
1391         generation = new_generation();
1392
1393         /* change the vnnmap on this node to use the new generation 
1394            number but not on any other nodes.
1395            this guarantees that if we abort the recovery prematurely
1396            for some reason (a node stops responding?)
1397            that we can just return immediately and we will reenter
1398            recovery shortly again.
1399            I.e. we deliberately leave the cluster with an inconsistent
1400            generation id to allow us to abort recovery at any stage and
1401            just restart it from scratch.
1402          */
1403         vnnmap->generation = generation;
1404         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1405         if (ret != 0) {
1406                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1407                 return -1;
1408         }
1409
1410         data.dptr = (void *)&generation;
1411         data.dsize = sizeof(uint32_t);
1412
1413         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1414         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1415                                         nodes,
1416                                         CONTROL_TIMEOUT(), false, data,
1417                                         NULL, NULL,
1418                                         NULL) != 0) {
1419                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1420                 return -1;
1421         }
1422
1423         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1424
1425         for (i=0;i<dbmap->num;i++) {
1426                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1427                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1428                         return -1;
1429                 }
1430         }
1431
1432         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1433
1434         /* commit all the changes */
1435         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1436                                         nodes,
1437                                         CONTROL_TIMEOUT(), false, data,
1438                                         NULL, NULL,
1439                                         NULL) != 0) {
1440                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1441                 return -1;
1442         }
1443
1444         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1445         
1446
1447         /* update the capabilities for all nodes */
1448         ret = update_capabilities(ctdb, nodemap);
1449         if (ret!=0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1451                 return -1;
1452         }
1453
1454         /* build a new vnn map with all the currently active and
1455            unbanned nodes */
1456         generation = new_generation();
1457         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1458         CTDB_NO_MEMORY(ctdb, vnnmap);
1459         vnnmap->generation = generation;
1460         vnnmap->size = 0;
1461         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1462         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1463         for (i=j=0;i<nodemap->num;i++) {
1464                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1465                         continue;
1466                 }
1467                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1468                         /* this node can not be an lmaster */
1469                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1470                         continue;
1471                 }
1472
1473                 vnnmap->size++;
1474                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1475                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1476                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1477
1478         }
1479         if (vnnmap->size == 0) {
1480                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1481                 vnnmap->size++;
1482                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1483                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1484                 vnnmap->map[0] = pnn;
1485         }       
1486
1487         /* update to the new vnnmap on all nodes */
1488         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1489         if (ret != 0) {
1490                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1491                 return -1;
1492         }
1493
1494         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1495
1496         /* update recmaster to point to us for all nodes */
1497         ret = set_recovery_master(ctdb, nodemap, pnn);
1498         if (ret!=0) {
1499                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1500                 return -1;
1501         }
1502
1503         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1504
1505         /*
1506           update all nodes to have the same flags that we have
1507          */
1508         ret = update_flags_on_all_nodes(ctdb, nodemap);
1509         if (ret != 0) {
1510                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
1511                 return -1;
1512         }
1513         
1514         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1515
1516         /* disable recovery mode */
1517         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
1518         if (ret != 0) {
1519                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1520                 return -1;
1521         }
1522
1523         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1524
1525         /*
1526           tell nodes to takeover their public IPs
1527          */
1528         rec->need_takeover_run = false;
1529         ret = ctdb_takeover_run(ctdb, nodemap);
1530         if (ret != 0) {
1531                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1532                 return -1;
1533         }
1534         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1535
1536         /* execute the "recovered" event script on all nodes */
1537         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1538         if (ret!=0) {
1539                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1540                 return -1;
1541         }
1542
1543         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1544
1545         /* send a message to all clients telling them that the cluster 
1546            has been reconfigured */
1547         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1548
1549         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1550
1551         rec->need_recovery = false;
1552
1553         /* We just finished a recovery successfully. 
1554            We now wait for rerecovery_timeout before we allow 
1555            another recovery to take place.
1556         */
1557         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1558         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1559         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1560
1561         return 0;
1562 }
1563
1564
1565 /*
1566   elections are won by first checking the number of connected nodes, then
1567   the priority time, then the pnn
1568  */
1569 struct election_message {
1570         uint32_t num_connected;
1571         struct timeval priority_time;
1572         uint32_t pnn;
1573         uint32_t node_flags;
1574 };
1575
1576 /*
1577   form this nodes election data
1578  */
1579 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1580 {
1581         int ret, i;
1582         struct ctdb_node_map *nodemap;
1583         struct ctdb_context *ctdb = rec->ctdb;
1584
1585         ZERO_STRUCTP(em);
1586
1587         em->pnn = rec->ctdb->pnn;
1588         em->priority_time = rec->priority_time;
1589         em->node_flags = rec->node_flags;
1590
1591         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1592         if (ret != 0) {
1593                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1594                 return;
1595         }
1596
1597         for (i=0;i<nodemap->num;i++) {
1598                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1599                         em->num_connected++;
1600                 }
1601         }
1602
1603         /* we shouldnt try to win this election if we cant be a recmaster */
1604         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1605                 em->num_connected = 0;
1606                 em->priority_time = timeval_current();
1607         }
1608
1609         talloc_free(nodemap);
1610 }
1611
1612 /*
1613   see if the given election data wins
1614  */
1615 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1616 {
1617         struct election_message myem;
1618         int cmp = 0;
1619
1620         ctdb_election_data(rec, &myem);
1621
1622         /* we cant win if we dont have the recmaster capability */
1623         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1624                 return false;
1625         }
1626
1627         /* we cant win if we are banned */
1628         if (rec->node_flags & NODE_FLAGS_BANNED) {
1629                 return false;
1630         }       
1631
1632         /* we will automatically win if the other node is banned */
1633         if (em->node_flags & NODE_FLAGS_BANNED) {
1634                 return true;
1635         }
1636
1637         /* try to use the most connected node */
1638         if (cmp == 0) {
1639                 cmp = (int)myem.num_connected - (int)em->num_connected;
1640         }
1641
1642         /* then the longest running node */
1643         if (cmp == 0) {
1644                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1645         }
1646
1647         if (cmp == 0) {
1648                 cmp = (int)myem.pnn - (int)em->pnn;
1649         }
1650
1651         return cmp > 0;
1652 }
1653
1654 /*
1655   send out an election request
1656  */
1657 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1658 {
1659         int ret;
1660         TDB_DATA election_data;
1661         struct election_message emsg;
1662         uint64_t srvid;
1663         struct ctdb_context *ctdb = rec->ctdb;
1664
1665         srvid = CTDB_SRVID_RECOVERY;
1666
1667         ctdb_election_data(rec, &emsg);
1668
1669         election_data.dsize = sizeof(struct election_message);
1670         election_data.dptr  = (unsigned char *)&emsg;
1671
1672
1673         /* send an election message to all active nodes */
1674         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1675
1676
1677         /* A new node that is already frozen has entered the cluster.
1678            The existing nodes are not frozen and dont need to be frozen
1679            until the election has ended and we start the actual recovery
1680         */
1681         if (update_recmaster == true) {
1682                 /* first we assume we will win the election and set 
1683                    recoverymaster to be ourself on the current node
1684                  */
1685                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1686                 if (ret != 0) {
1687                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1688                         return -1;
1689                 }
1690         }
1691
1692
1693         return 0;
1694 }
1695
1696 /*
1697   this function will unban all nodes in the cluster
1698 */
1699 static void unban_all_nodes(struct ctdb_context *ctdb)
1700 {
1701         int ret, i;
1702         struct ctdb_node_map *nodemap;
1703         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1704         
1705         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1706         if (ret != 0) {
1707                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1708                 return;
1709         }
1710
1711         for (i=0;i<nodemap->num;i++) {
1712                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1713                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1714                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1715                 }
1716         }
1717
1718         talloc_free(tmp_ctx);
1719 }
1720
1721
1722 /*
1723   we think we are winning the election - send a broadcast election request
1724  */
1725 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1726 {
1727         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1728         int ret;
1729
1730         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1731         if (ret != 0) {
1732                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1733         }
1734
1735         talloc_free(rec->send_election_te);
1736         rec->send_election_te = NULL;
1737 }
1738
1739 /*
1740   handler for memory dumps
1741 */
1742 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1743                              TDB_DATA data, void *private_data)
1744 {
1745         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1746         TDB_DATA *dump;
1747         int ret;
1748         struct rd_memdump_reply *rd;
1749
1750         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1751                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1752                 return;
1753         }
1754         rd = (struct rd_memdump_reply *)data.dptr;
1755
1756         dump = talloc_zero(tmp_ctx, TDB_DATA);
1757         if (dump == NULL) {
1758                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1759                 talloc_free(tmp_ctx);
1760                 return;
1761         }
1762         ret = ctdb_dump_memory(ctdb, dump);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1765                 talloc_free(tmp_ctx);
1766                 return;
1767         }
1768
1769 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1770
1771         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1774                 return;
1775         }
1776
1777         talloc_free(tmp_ctx);
1778 }
1779
1780 /*
1781   handler for recovery master elections
1782 */
1783 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1784                              TDB_DATA data, void *private_data)
1785 {
1786         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1787         int ret;
1788         struct election_message *em = (struct election_message *)data.dptr;
1789         TALLOC_CTX *mem_ctx;
1790
1791         /* we got an election packet - update the timeout for the election */
1792         talloc_free(rec->election_timeout);
1793         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1794                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1795                                                 ctdb_election_timeout, rec);
1796
1797         mem_ctx = talloc_new(ctdb);
1798
1799         /* someone called an election. check their election data
1800            and if we disagree and we would rather be the elected node, 
1801            send a new election message to all other nodes
1802          */
1803         if (ctdb_election_win(rec, em)) {
1804                 if (!rec->send_election_te) {
1805                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1806                                                                 timeval_current_ofs(0, 500000),
1807                                                                 election_send_request, rec);
1808                 }
1809                 talloc_free(mem_ctx);
1810                 /*unban_all_nodes(ctdb);*/
1811                 return;
1812         }
1813         
1814         /* we didn't win */
1815         talloc_free(rec->send_election_te);
1816         rec->send_election_te = NULL;
1817
1818         /* release the recmaster lock */
1819         if (em->pnn != ctdb->pnn &&
1820             ctdb->recovery_lock_fd != -1) {
1821                 close(ctdb->recovery_lock_fd);
1822                 ctdb->recovery_lock_fd = -1;
1823                 unban_all_nodes(ctdb);
1824         }
1825
1826         /* ok, let that guy become recmaster then */
1827         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1828         if (ret != 0) {
1829                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1830                 talloc_free(mem_ctx);
1831                 return;
1832         }
1833
1834         /* release any bans */
1835         rec->last_culprit = (uint32_t)-1;
1836         talloc_free(rec->banned_nodes);
1837         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
1838         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
1839
1840         talloc_free(mem_ctx);
1841         return;
1842 }
1843
1844
1845 /*
1846   force the start of the election process
1847  */
1848 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1849                            struct ctdb_node_map *nodemap)
1850 {
1851         int ret;
1852         struct ctdb_context *ctdb = rec->ctdb;
1853
1854         /* set all nodes to recovery mode to stop all internode traffic */
1855         ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_ACTIVE);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1858                 return;
1859         }
1860
1861         talloc_free(rec->election_timeout);
1862         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1863                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1864                                                 ctdb_election_timeout, rec);
1865
1866         ret = send_election_request(rec, pnn, true);
1867         if (ret!=0) {
1868                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1869                 return;
1870         }
1871
1872         /* wait for a few seconds to collect all responses */
1873         ctdb_wait_election(rec);
1874 }
1875
1876
1877
1878 /*
1879   handler for when a node changes its flags
1880 */
1881 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1882                             TDB_DATA data, void *private_data)
1883 {
1884         int ret;
1885         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1886         struct ctdb_node_map *nodemap=NULL;
1887         TALLOC_CTX *tmp_ctx;
1888         uint32_t changed_flags;
1889         int i;
1890         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1891
1892         if (data.dsize != sizeof(*c)) {
1893                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1894                 return;
1895         }
1896
1897         tmp_ctx = talloc_new(ctdb);
1898         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1899
1900         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1901         if (ret != 0) {
1902                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1903                 talloc_free(tmp_ctx);
1904                 return;         
1905         }
1906
1907
1908         for (i=0;i<nodemap->num;i++) {
1909                 if (nodemap->nodes[i].pnn == c->pnn) break;
1910         }
1911
1912         if (i == nodemap->num) {
1913                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1914                 talloc_free(tmp_ctx);
1915                 return;
1916         }
1917
1918         changed_flags = c->old_flags ^ c->new_flags;
1919
1920         /* Dont let messages from remote nodes change the DISCONNECTED flag. 
1921            This flag is handled locally based on whether the local node
1922            can communicate with the node or not.
1923         */
1924         c->new_flags &= ~NODE_FLAGS_DISCONNECTED;
1925         if (nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED) {
1926                 c->new_flags |= NODE_FLAGS_DISCONNECTED;
1927         }
1928
1929         if (nodemap->nodes[i].flags != c->new_flags) {
1930                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1931         }
1932
1933         nodemap->nodes[i].flags = c->new_flags;
1934
1935         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1936                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
1937
1938         if (ret == 0) {
1939                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
1940                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
1941         }
1942         
1943         if (ret == 0 &&
1944             ctdb->recovery_master == ctdb->pnn &&
1945             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
1946                 /* Only do the takeover run if the perm disabled or unhealthy
1947                    flags changed since these will cause an ip failover but not
1948                    a recovery.
1949                    If the node became disconnected or banned this will also
1950                    lead to an ip address failover but that is handled 
1951                    during recovery
1952                 */
1953                 if (changed_flags & NODE_FLAGS_DISABLED) {
1954                         rec->need_takeover_run = true;
1955                 }
1956         }
1957
1958         talloc_free(tmp_ctx);
1959 }
1960
1961
1962
1963 struct verify_recmode_normal_data {
1964         uint32_t count;
1965         enum monitor_result status;
1966 };
1967
1968 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
1969 {
1970         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
1971
1972
1973         /* one more node has responded with recmode data*/
1974         rmdata->count--;
1975
1976         /* if we failed to get the recmode, then return an error and let
1977            the main loop try again.
1978         */
1979         if (state->state != CTDB_CONTROL_DONE) {
1980                 if (rmdata->status == MONITOR_OK) {
1981                         rmdata->status = MONITOR_FAILED;
1982                 }
1983                 return;
1984         }
1985
1986         /* if we got a response, then the recmode will be stored in the
1987            status field
1988         */
1989         if (state->status != CTDB_RECOVERY_NORMAL) {
1990                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
1991                 rmdata->status = MONITOR_RECOVERY_NEEDED;
1992         }
1993
1994         return;
1995 }
1996
1997
1998 /* verify that all nodes are in normal recovery mode */
1999 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2000 {
2001         struct verify_recmode_normal_data *rmdata;
2002         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2003         struct ctdb_client_control_state *state;
2004         enum monitor_result status;
2005         int j;
2006         
2007         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2008         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2009         rmdata->count  = 0;
2010         rmdata->status = MONITOR_OK;
2011
2012         /* loop over all active nodes and send an async getrecmode call to 
2013            them*/
2014         for (j=0; j<nodemap->num; j++) {
2015                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2016                         continue;
2017                 }
2018                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2019                                         CONTROL_TIMEOUT(), 
2020                                         nodemap->nodes[j].pnn);
2021                 if (state == NULL) {
2022                         /* we failed to send the control, treat this as 
2023                            an error and try again next iteration
2024                         */                      
2025                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2026                         talloc_free(mem_ctx);
2027                         return MONITOR_FAILED;
2028                 }
2029
2030                 /* set up the callback functions */
2031                 state->async.fn = verify_recmode_normal_callback;
2032                 state->async.private_data = rmdata;
2033
2034                 /* one more control to wait for to complete */
2035                 rmdata->count++;
2036         }
2037
2038
2039         /* now wait for up to the maximum number of seconds allowed
2040            or until all nodes we expect a response from has replied
2041         */
2042         while (rmdata->count > 0) {
2043                 event_loop_once(ctdb->ev);
2044         }
2045
2046         status = rmdata->status;
2047         talloc_free(mem_ctx);
2048         return status;
2049 }
2050
2051
2052 struct verify_recmaster_data {
2053         struct ctdb_recoverd *rec;
2054         uint32_t count;
2055         uint32_t pnn;
2056         enum monitor_result status;
2057 };
2058
2059 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2060 {
2061         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2062
2063
2064         /* one more node has responded with recmaster data*/
2065         rmdata->count--;
2066
2067         /* if we failed to get the recmaster, then return an error and let
2068            the main loop try again.
2069         */
2070         if (state->state != CTDB_CONTROL_DONE) {
2071                 if (rmdata->status == MONITOR_OK) {
2072                         rmdata->status = MONITOR_FAILED;
2073                 }
2074                 return;
2075         }
2076
2077         /* if we got a response, then the recmaster will be stored in the
2078            status field
2079         */
2080         if (state->status != rmdata->pnn) {
2081                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2082                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2083                 rmdata->status = MONITOR_ELECTION_NEEDED;
2084         }
2085
2086         return;
2087 }
2088
2089
2090 /* verify that all nodes agree that we are the recmaster */
2091 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2092 {
2093         struct ctdb_context *ctdb = rec->ctdb;
2094         struct verify_recmaster_data *rmdata;
2095         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2096         struct ctdb_client_control_state *state;
2097         enum monitor_result status;
2098         int j;
2099         
2100         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2101         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2102         rmdata->rec    = rec;
2103         rmdata->count  = 0;
2104         rmdata->pnn    = pnn;
2105         rmdata->status = MONITOR_OK;
2106
2107         /* loop over all active nodes and send an async getrecmaster call to 
2108            them*/
2109         for (j=0; j<nodemap->num; j++) {
2110                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2111                         continue;
2112                 }
2113                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2114                                         CONTROL_TIMEOUT(),
2115                                         nodemap->nodes[j].pnn);
2116                 if (state == NULL) {
2117                         /* we failed to send the control, treat this as 
2118                            an error and try again next iteration
2119                         */                      
2120                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2121                         talloc_free(mem_ctx);
2122                         return MONITOR_FAILED;
2123                 }
2124
2125                 /* set up the callback functions */
2126                 state->async.fn = verify_recmaster_callback;
2127                 state->async.private_data = rmdata;
2128
2129                 /* one more control to wait for to complete */
2130                 rmdata->count++;
2131         }
2132
2133
2134         /* now wait for up to the maximum number of seconds allowed
2135            or until all nodes we expect a response from has replied
2136         */
2137         while (rmdata->count > 0) {
2138                 event_loop_once(ctdb->ev);
2139         }
2140
2141         status = rmdata->status;
2142         talloc_free(mem_ctx);
2143         return status;
2144 }
2145
2146 /*
2147   this function writes the number of connected nodes we have for this pnn
2148   to the pnn slot in the reclock file
2149 */
2150 static void
2151 ctdb_recoverd_write_pnn_connect_count(struct ctdb_recoverd *rec)
2152 {
2153         const char count = rec->num_connected;
2154         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
2155
2156         if (rec->rec_file_fd == -1) {
2157                 DEBUG(DEBUG_CRIT,(__location__ " Unable to write pnn count. pnnfile is not open.\n"));
2158                 return;
2159         } 
2160
2161         if (pwrite(rec->rec_file_fd, &count, 1, ctdb->pnn) == -1) {
2162                 DEBUG(DEBUG_CRIT, (__location__ " Failed to write pnn count\n"));
2163                 close(rec->rec_file_fd);
2164                 rec->rec_file_fd = -1;
2165         }
2166 }
2167
2168 /* 
2169   this function opens the reclock file and sets a byterage lock for the single
2170   byte at position pnn+1.
2171   the existence/non-existence of such a lock provides an alternative mechanism
2172   to know whether a remote node(recovery daemon) is running or not.
2173 */
2174 static void
2175 ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
2176 {
2177         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
2178         struct flock lock;
2179         char *pnnfile = NULL;
2180
2181         DEBUG(DEBUG_INFO, ("Setting PNN lock for pnn:%d\n", ctdb->pnn));
2182
2183         if (rec->rec_file_fd != -1) {
2184                 close(rec->rec_file_fd);
2185                 rec->rec_file_fd = -1;
2186         }
2187
2188         pnnfile = talloc_asprintf(rec, "%s.pnn", ctdb->recovery_lock_file);
2189         CTDB_NO_MEMORY_FATAL(ctdb, pnnfile);
2190
2191         rec->rec_file_fd = open(pnnfile, O_RDWR|O_CREAT, 0600);
2192         if (rec->rec_file_fd == -1) {
2193                 DEBUG(DEBUG_CRIT,(__location__ " Unable to open %s - (%s)\n", 
2194                          pnnfile, strerror(errno)));
2195                 talloc_free(pnnfile);
2196                 return;
2197         }
2198
2199         set_close_on_exec(rec->rec_file_fd);
2200         lock.l_type = F_WRLCK;
2201         lock.l_whence = SEEK_SET;
2202         lock.l_start = ctdb->pnn;
2203         lock.l_len = 1;
2204         lock.l_pid = 0;
2205
2206         if (fcntl(rec->rec_file_fd, F_SETLK, &lock) != 0) {
2207                 close(rec->rec_file_fd);
2208                 rec->rec_file_fd = -1;
2209                 DEBUG(DEBUG_CRIT,(__location__ " Failed to get pnn lock on '%s'\n", pnnfile));
2210                 talloc_free(pnnfile);
2211                 return;
2212         }
2213
2214
2215         DEBUG(DEBUG_NOTICE,(__location__ " Got pnn lock on '%s'\n", pnnfile));
2216         talloc_free(pnnfile);
2217
2218         /* we start out with 0 connected nodes */
2219         ctdb_recoverd_write_pnn_connect_count(rec);
2220 }
2221
2222 /*
2223   called when we need to do the periodical reclock pnn count update
2224  */
2225 static void ctdb_update_pnn_count(struct event_context *ev, struct timed_event *te, 
2226                                   struct timeval t, void *p)
2227 {
2228         int i, count;
2229         struct ctdb_recoverd *rec     = talloc_get_type(p, struct ctdb_recoverd);
2230         struct ctdb_context *ctdb     = rec->ctdb;
2231         struct ctdb_node_map *nodemap = rec->nodemap;
2232
2233         /* close and reopen the pnn lock file */
2234         ctdb_recoverd_get_pnn_lock(rec);
2235
2236         ctdb_recoverd_write_pnn_connect_count(rec);
2237
2238         event_add_timed(rec->ctdb->ev, rec->ctdb,
2239                 timeval_current_ofs(ctdb->tunable.reclock_ping_period, 0), 
2240                 ctdb_update_pnn_count, rec);
2241
2242         /* check if there is a split cluster and yeld the recmaster role
2243            it the other half of the cluster is larger 
2244         */
2245         DEBUG(DEBUG_DEBUG, ("CHECK FOR SPLIT CLUSTER\n"));
2246         if (rec->nodemap == NULL) {
2247                 return;
2248         }
2249         if (rec->rec_file_fd == -1) {
2250                 return;
2251         }
2252         /* only test this if we think we are the recmaster */
2253         if (ctdb->pnn != rec->recmaster) {
2254                 DEBUG(DEBUG_DEBUG, ("We are not recmaster, skip test\n"));
2255                 return;
2256         }
2257         if (ctdb->recovery_lock_fd == -1) {
2258                 DEBUG(DEBUG_ERR, (__location__ " Lost reclock pnn file. Yielding recmaster role\n"));
2259                 close(ctdb->recovery_lock_fd);
2260                 ctdb->recovery_lock_fd = -1;
2261                 force_election(rec, ctdb->pnn, rec->nodemap);
2262                 return;
2263         }
2264         for (i=0; i<nodemap->num; i++) {
2265                 /* we dont need to check ourself */
2266                 if (nodemap->nodes[i].pnn == ctdb->pnn) {
2267                         continue;
2268                 }
2269                 /* dont check nodes that are connected to us */
2270                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2271                         continue;
2272                 }
2273                 /* check if the node is "connected" and how connected it it */
2274                 count = ctdb_read_pnn_lock(rec->rec_file_fd, nodemap->nodes[i].pnn);
2275                 if (count < 0) {
2276                         continue;
2277                 }
2278                 /* check if that node is more connected that us */
2279                 if (count > rec->num_connected) {
2280                         DEBUG(DEBUG_ERR, ("DISCONNECTED Node %u is more connected than we are, yielding recmaster role\n", nodemap->nodes[i].pnn));
2281                         close(ctdb->recovery_lock_fd);
2282                         ctdb->recovery_lock_fd = -1;
2283                         force_election(rec, ctdb->pnn, rec->nodemap);
2284                         return;
2285                 }
2286         }
2287 }
2288
2289 /* called to check that the allocation of public ip addresses is ok.
2290 */
2291 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2292 {
2293         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2294         struct ctdb_all_public_ips *ips = NULL;
2295         struct ctdb_uptime *uptime1 = NULL;
2296         struct ctdb_uptime *uptime2 = NULL;
2297         int ret, j;
2298
2299         ret = ctdb_ctrl_uptime(ctdb, ctdb, CONTROL_TIMEOUT(),
2300                                 CTDB_CURRENT_NODE, &uptime1);
2301         if (ret != 0) {
2302                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2303                 talloc_free(mem_ctx);
2304                 return -1;
2305         }
2306
2307         /* read the ip allocation from the local node */
2308         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2309         if (ret != 0) {
2310                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2311                 talloc_free(mem_ctx);
2312                 return -1;
2313         }
2314
2315         ret = ctdb_ctrl_uptime(ctdb, ctdb, CONTROL_TIMEOUT(),
2316                                 CTDB_CURRENT_NODE, &uptime2);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2319                 talloc_free(mem_ctx);
2320                 return -1;
2321         }
2322
2323         /* skip the check if the startrecovery time has changed */
2324         if (timeval_compare(&uptime1->last_recovery_started,
2325                             &uptime2->last_recovery_started) != 0) {
2326                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2327                 return 0;
2328         }
2329
2330         /* skip the check if the endrecovery time has changed */
2331         if (timeval_compare(&uptime1->last_recovery_finished,
2332                             &uptime2->last_recovery_finished) != 0) {
2333                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2334                 return 0;
2335         }
2336
2337         /* skip the check if we have started but not finished recovery */
2338         if (timeval_compare(&uptime1->last_recovery_finished,
2339                             &uptime1->last_recovery_started) != 1) {
2340                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2341
2342                 return 0;
2343         }
2344
2345         /* verify that we have the ip addresses we should have
2346            and we dont have ones we shouldnt have.
2347            if we find an inconsistency we set recmode to
2348            active on the local node and wait for the recmaster
2349            to do a full blown recovery
2350         */
2351         for (j=0; j<ips->num; j++) {
2352                 if (ips->ips[j].pnn == pnn) {
2353                         if (!ctdb_sys_have_ip(ips->ips[j].sin)) {
2354                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2355                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2356                                 if (ret != 0) {
2357                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2358
2359                                         talloc_free(mem_ctx);
2360                                         return -1;
2361                                 }
2362                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2363                                 if (ret != 0) {
2364                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2365
2366                                         talloc_free(mem_ctx);
2367                                         return -1;
2368                                 }
2369                         }
2370                 } else {
2371                         if (ctdb_sys_have_ip(ips->ips[j].sin)) {
2372                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", inet_ntoa(ips->ips[j].sin.sin_addr)));
2373                                 ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2374                                 if (ret != 0) {
2375                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2376
2377                                         talloc_free(mem_ctx);
2378                                         return -1;
2379                                 }
2380                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2381                                 if (ret != 0) {
2382                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2383
2384                                         talloc_free(mem_ctx);
2385                                         return -1;
2386                                 }
2387                         }
2388                 }
2389         }
2390
2391         talloc_free(mem_ctx);
2392         return 0;
2393 }
2394
2395 /*
2396   the main monitoring loop
2397  */
2398 static void monitor_cluster(struct ctdb_context *ctdb)
2399 {
2400         uint32_t pnn;
2401         TALLOC_CTX *mem_ctx=NULL;
2402         struct ctdb_node_map *nodemap=NULL;
2403         struct ctdb_node_map *remote_nodemap=NULL;
2404         struct ctdb_vnn_map *vnnmap=NULL;
2405         struct ctdb_vnn_map *remote_vnnmap=NULL;
2406         int32_t debug_level;
2407         int i, j, ret;
2408         struct ctdb_recoverd *rec;
2409         char c;
2410
2411         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2412
2413         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2414         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2415
2416         rec->ctdb = ctdb;
2417         rec->banned_nodes = talloc_zero_array(rec, struct ban_state *, ctdb->num_nodes);
2418         CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
2419
2420         rec->priority_time = timeval_current();
2421
2422         /* open the rec file fd and lock our slot */
2423         rec->rec_file_fd = -1;
2424         ctdb_recoverd_get_pnn_lock(rec);
2425
2426         /* register a message port for sending memory dumps */
2427         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2428
2429         /* register a message port for recovery elections */
2430         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2431
2432         /* and one for when nodes are disabled/enabled */
2433         ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
2434
2435         /* and one for when nodes are banned */
2436         ctdb_set_message_handler(ctdb, CTDB_SRVID_BAN_NODE, ban_handler, rec);
2437
2438         /* and one for when nodes are unbanned */
2439         ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
2440
2441         /* register a message port for vacuum fetch */
2442         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2443
2444         /* update the reclock pnn file connected count on a regular basis */
2445         event_add_timed(ctdb->ev, ctdb,
2446                 timeval_current_ofs(ctdb->tunable.reclock_ping_period, 0), 
2447                 ctdb_update_pnn_count, rec);
2448
2449 again:
2450         if (mem_ctx) {
2451                 talloc_free(mem_ctx);
2452                 mem_ctx = NULL;
2453         }
2454         mem_ctx = talloc_new(ctdb);
2455         if (!mem_ctx) {
2456                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2457                 exit(-1);
2458         }
2459
2460         /* we only check for recovery once every second */
2461         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2462
2463         /* verify that the main daemon is still running */
2464         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2465                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2466                 exit(-1);
2467         }
2468
2469         if (rec->election_timeout) {
2470                 /* an election is in progress */
2471                 goto again;
2472         }
2473
2474         /* read the debug level from the parent and update locally */
2475         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2476         if (ret !=0) {
2477                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2478                 goto again;
2479         }
2480         LogLevel = debug_level;
2481
2482
2483         /* We must check if we need to ban a node here but we want to do this
2484            as early as possible so we dont wait until we have pulled the node
2485            map from the local node. thats why we have the hardcoded value 20
2486         */
2487         if (rec->culprit_counter > 20) {
2488                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u failures in %.0f seconds - banning it for %u seconds\n",
2489                          rec->last_culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
2490                          ctdb->tunable.recovery_ban_period));
2491                 ctdb_ban_node(rec, rec->last_culprit, ctdb->tunable.recovery_ban_period);
2492         }
2493
2494         /* get relevant tunables */
2495         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2496         if (ret != 0) {
2497                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2498                 goto again;
2499         }
2500
2501         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2502         if (pnn == (uint32_t)-1) {
2503                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2504                 goto again;
2505         }
2506
2507         /* get the vnnmap */
2508         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2511                 goto again;
2512         }
2513
2514
2515         /* get number of nodes */
2516         if (rec->nodemap) {
2517                 talloc_free(rec->nodemap);
2518                 rec->nodemap = NULL;
2519                 nodemap=NULL;
2520         }
2521         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2522         if (ret != 0) {
2523                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2524                 goto again;
2525         }
2526         nodemap = rec->nodemap;
2527
2528         /* check which node is the recovery master */
2529         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2530         if (ret != 0) {
2531                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2532                 goto again;
2533         }
2534
2535         if (rec->recmaster == (uint32_t)-1) {
2536                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2537                 force_election(rec, pnn, nodemap);
2538                 goto again;
2539         }
2540         
2541         /* check that we (recovery daemon) and the local ctdb daemon
2542            agrees on whether we are banned or not
2543         */
2544         if (nodemap->nodes[pnn].flags & NODE_FLAGS_BANNED) {
2545                 if (rec->banned_nodes[pnn] == NULL) {
2546                         if (rec->recmaster == pnn) {
2547                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster thinks this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2548
2549                                 ctdb_unban_node(rec, pnn);
2550                         } else {
2551                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster thinks this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2552                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2553                                 ctdb_set_culprit(rec, pnn);
2554                         }
2555                         goto again;
2556                 }
2557         } else {
2558                 if (rec->banned_nodes[pnn] != NULL) {
2559                         if (rec->recmaster == pnn) {
2560                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on recmaster does not think this node is BANNED but the recovery master disagrees. Unbanning the node\n"));
2561
2562                                 ctdb_unban_node(rec, pnn);
2563                         } else {
2564                                 DEBUG(DEBUG_NOTICE,("Local ctdb daemon on non-recmaster does not think this node is BANNED but the recovery master disagrees. Re-banning the node\n"));
2565
2566                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2567                                 ctdb_set_culprit(rec, pnn);
2568                         }
2569                         goto again;
2570                 }
2571         }
2572
2573         /* remember our own node flags */
2574         rec->node_flags = nodemap->nodes[pnn].flags;
2575
2576         /* count how many active nodes there are */
2577         rec->num_active    = 0;
2578         rec->num_connected = 0;
2579         for (i=0; i<nodemap->num; i++) {
2580                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2581                         rec->num_active++;
2582                 }
2583                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2584                         rec->num_connected++;
2585                 }
2586         }
2587
2588
2589         /* verify that the recmaster node is still active */
2590         for (j=0; j<nodemap->num; j++) {
2591                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2592                         break;
2593                 }
2594         }
2595
2596         if (j == nodemap->num) {
2597                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2598                 force_election(rec, pnn, nodemap);
2599                 goto again;
2600         }
2601
2602         /* if recovery master is disconnected we must elect a new recmaster */
2603         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2604                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2605                 force_election(rec, pnn, nodemap);
2606                 goto again;
2607         }
2608
2609         /* grap the nodemap from the recovery master to check if it is banned */
2610         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2611                                    mem_ctx, &remote_nodemap);
2612         if (ret != 0) {
2613                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2614                           nodemap->nodes[j].pnn));
2615                 goto again;
2616         }
2617
2618
2619         if (remote_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2620                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2621                 force_election(rec, pnn, nodemap);
2622                 goto again;
2623         }
2624
2625
2626         /* verify that we and the recmaster agrees on our flags */
2627         if (nodemap->nodes[pnn].flags != remote_nodemap->nodes[pnn].flags) {
2628                 DEBUG(DEBUG_ERR, (__location__ " Recmaster disagrees on our flags flags:0x%x recmaster_flags:0x%x  Broadcasting out flags.\n", nodemap->nodes[pnn].flags, remote_nodemap->nodes[pnn].flags));
2629
2630                 update_our_flags_on_all_nodes(ctdb, pnn, nodemap);
2631         }
2632
2633
2634         /* verify that we have all ip addresses we should have and we dont
2635          * have addresses we shouldnt have.
2636          */ 
2637         if (verify_ip_allocation(ctdb, pnn) != 0) {
2638                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2639                 goto again;
2640         }
2641
2642
2643         /* if we are not the recmaster then we do not need to check
2644            if recovery is needed
2645          */
2646         if (pnn != rec->recmaster) {
2647                 goto again;
2648         }
2649
2650
2651         /* ensure our local copies of flags are right */
2652         ret = update_local_flags(rec, nodemap);
2653         if (ret == MONITOR_ELECTION_NEEDED) {
2654                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2655                 force_election(rec, pnn, nodemap);
2656                 goto again;
2657         }
2658         if (ret != MONITOR_OK) {
2659                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2660                 goto again;
2661         }
2662
2663         /* update the list of public ips that a node can handle for
2664            all connected nodes
2665         */
2666         for (j=0; j<nodemap->num; j++) {
2667                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2668                         continue;
2669                 }
2670                 /* release any existing data */
2671                 if (ctdb->nodes[j]->public_ips) {
2672                         talloc_free(ctdb->nodes[j]->public_ips);
2673                         ctdb->nodes[j]->public_ips = NULL;
2674                 }
2675                 /* grab a new shiny list of public ips from the node */
2676                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2677                         ctdb->nodes[j]->pnn, 
2678                         ctdb->nodes,
2679                         &ctdb->nodes[j]->public_ips)) {
2680                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2681                                 ctdb->nodes[j]->pnn));
2682                         goto again;
2683                 }
2684         }
2685
2686
2687         /* verify that all active nodes agree that we are the recmaster */
2688         switch (verify_recmaster(rec, nodemap, pnn)) {
2689         case MONITOR_RECOVERY_NEEDED:
2690                 /* can not happen */
2691                 goto again;
2692         case MONITOR_ELECTION_NEEDED:
2693                 force_election(rec, pnn, nodemap);
2694                 goto again;
2695         case MONITOR_OK:
2696                 break;
2697         case MONITOR_FAILED:
2698                 goto again;
2699         }
2700
2701
2702         if (rec->need_recovery) {
2703                 /* a previous recovery didn't finish */
2704                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, -1);
2705                 goto again;             
2706         }
2707
2708         /* verify that all active nodes are in normal mode 
2709            and not in recovery mode 
2710          */
2711         switch (verify_recmode(ctdb, nodemap)) {
2712         case MONITOR_RECOVERY_NEEDED:
2713                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2714                 goto again;
2715         case MONITOR_FAILED:
2716                 goto again;
2717         case MONITOR_ELECTION_NEEDED:
2718                 /* can not happen */
2719         case MONITOR_OK:
2720                 break;
2721         }
2722
2723
2724         /* we should have the reclock - check its not stale */
2725         if (ctdb->recovery_lock_fd == -1) {
2726                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2727                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2728                 goto again;
2729         }
2730
2731         if (pread(ctdb->recovery_lock_fd, &c, 1, 0) == -1) {
2732                 DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2733                 close(ctdb->recovery_lock_fd);
2734                 ctdb->recovery_lock_fd = -1;
2735                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2736                 goto again;
2737         }
2738
2739         /* get the nodemap for all active remote nodes and verify
2740            they are the same as for this node
2741          */
2742         for (j=0; j<nodemap->num; j++) {
2743                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2744                         continue;
2745                 }
2746                 if (nodemap->nodes[j].pnn == pnn) {
2747                         continue;
2748                 }
2749
2750                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2751                                            mem_ctx, &remote_nodemap);
2752                 if (ret != 0) {
2753                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
2754                                   nodemap->nodes[j].pnn));
2755                         goto again;
2756                 }
2757
2758                 /* if the nodes disagree on how many nodes there are
2759                    then this is a good reason to try recovery
2760                  */
2761                 if (remote_nodemap->num != nodemap->num) {
2762                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
2763                                   nodemap->nodes[j].pnn, remote_nodemap->num, nodemap->num));
2764                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2765                         goto again;
2766                 }
2767
2768                 /* if the nodes disagree on which nodes exist and are
2769                    active, then that is also a good reason to do recovery
2770                  */
2771                 for (i=0;i<nodemap->num;i++) {
2772                         if (remote_nodemap->nodes[i].pnn != nodemap->nodes[i].pnn) {
2773                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
2774                                           nodemap->nodes[j].pnn, i, 
2775                                           remote_nodemap->nodes[i].pnn, nodemap->nodes[i].pnn));
2776                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2777                                             vnnmap, nodemap->nodes[j].pnn);
2778                                 goto again;
2779                         }
2780                         if ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
2781                             (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2782                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap flag for %d (0x%x vs 0x%x)\n", 
2783                                           nodemap->nodes[j].pnn, i,
2784                                           remote_nodemap->nodes[i].flags, nodemap->nodes[i].flags));
2785                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2786                                             vnnmap, nodemap->nodes[j].pnn);
2787                                 goto again;
2788                         }
2789                 }
2790
2791         }
2792
2793
2794         /* there better be the same number of lmasters in the vnn map
2795            as there are active nodes or we will have to do a recovery
2796          */
2797         if (vnnmap->size != rec->num_active) {
2798                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
2799                           vnnmap->size, rec->num_active));
2800                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, ctdb->pnn);
2801                 goto again;
2802         }
2803
2804         /* verify that all active nodes in the nodemap also exist in 
2805            the vnnmap.
2806          */
2807         for (j=0; j<nodemap->num; j++) {
2808                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2809                         continue;
2810                 }
2811                 if (nodemap->nodes[j].pnn == pnn) {
2812                         continue;
2813                 }
2814
2815                 for (i=0; i<vnnmap->size; i++) {
2816                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
2817                                 break;
2818                         }
2819                 }
2820                 if (i == vnnmap->size) {
2821                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
2822                                   nodemap->nodes[j].pnn));
2823                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2824                         goto again;
2825                 }
2826         }
2827
2828         
2829         /* verify that all other nodes have the same vnnmap
2830            and are from the same generation
2831          */
2832         for (j=0; j<nodemap->num; j++) {
2833                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2834                         continue;
2835                 }
2836                 if (nodemap->nodes[j].pnn == pnn) {
2837                         continue;
2838                 }
2839
2840                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2841                                           mem_ctx, &remote_vnnmap);
2842                 if (ret != 0) {
2843                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
2844                                   nodemap->nodes[j].pnn));
2845                         goto again;
2846                 }
2847
2848                 /* verify the vnnmap generation is the same */
2849                 if (vnnmap->generation != remote_vnnmap->generation) {
2850                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
2851                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
2852                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2853                         goto again;
2854                 }
2855
2856                 /* verify the vnnmap size is the same */
2857                 if (vnnmap->size != remote_vnnmap->size) {
2858                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
2859                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
2860                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap, nodemap->nodes[j].pnn);
2861                         goto again;
2862                 }
2863
2864                 /* verify the vnnmap is the same */
2865                 for (i=0;i<vnnmap->size;i++) {
2866                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
2867                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
2868                                           nodemap->nodes[j].pnn));
2869                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
2870                                             vnnmap, nodemap->nodes[j].pnn);
2871                                 goto again;
2872                         }
2873                 }
2874         }
2875
2876         /* we might need to change who has what IP assigned */
2877         if (rec->need_takeover_run) {
2878                 rec->need_takeover_run = false;
2879
2880                 /* execute the "startrecovery" event script on all nodes */
2881                 ret = run_startrecovery_eventscript(rec, nodemap);
2882                 if (ret!=0) {
2883                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
2884                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2885                                     vnnmap, ctdb->pnn);
2886                 }
2887
2888                 ret = ctdb_takeover_run(ctdb, nodemap);
2889                 if (ret != 0) {
2890                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
2891                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2892                                     vnnmap, ctdb->pnn);
2893                 }
2894
2895                 /* execute the "recovered" event script on all nodes */
2896                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
2897 #if 0
2898 // we cant check whether the event completed successfully
2899 // since this script WILL fail if the node is in recovery mode
2900 // and if that race happens, the code here would just cause a second
2901 // cascading recovery.
2902                 if (ret!=0) {
2903                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
2904                         do_recovery(rec, mem_ctx, pnn, nodemap, 
2905                                     vnnmap, ctdb->pnn);
2906                 }
2907 #endif
2908         }
2909
2910
2911         DEBUG(DEBUG_DEBUG, (__location__ " Update flags on all nodes\n"));
2912         /*
2913           update all nodes to have the same flags that we have
2914          */
2915         ret = update_flags_on_all_nodes(ctdb, nodemap);
2916         if (ret != 0) {
2917                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes\n"));
2918                 goto again;
2919         }
2920
2921         goto again;
2922
2923 }
2924
2925 /*
2926   event handler for when the main ctdbd dies
2927  */
2928 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
2929                                  uint16_t flags, void *private_data)
2930 {
2931         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
2932         _exit(1);
2933 }
2934
2935 /*
2936   called regularly to verify that the recovery daemon is still running
2937  */
2938 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
2939                               struct timeval yt, void *p)
2940 {
2941         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2942
2943         if (kill(ctdb->recoverd_pid, 0) != 0) {
2944                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
2945
2946                 ctdb_stop_recoverd(ctdb);
2947                 ctdb_stop_keepalive(ctdb);
2948                 ctdb_stop_monitoring(ctdb);
2949                 ctdb_release_all_ips(ctdb);
2950                 if (ctdb->methods != NULL) {
2951                         ctdb->methods->shutdown(ctdb);
2952                 }
2953                 ctdb_event_script(ctdb, "shutdown");
2954
2955                 exit(10);       
2956         }
2957
2958         event_add_timed(ctdb->ev, ctdb, 
2959                         timeval_current_ofs(30, 0),
2960                         ctdb_check_recd, ctdb);
2961 }
2962
2963 static void recd_sig_child_handler(struct event_context *ev,
2964         struct signal_event *se, int signum, int count,
2965         void *dont_care, 
2966         void *private_data)
2967 {
2968 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
2969         int status;
2970         pid_t pid = -1;
2971
2972         while (pid != 0) {
2973                 pid = waitpid(-1, &status, WNOHANG);
2974                 if (pid == -1) {
2975                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
2976                         return;
2977                 }
2978                 if (pid > 0) {
2979                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
2980                 }
2981         }
2982 }
2983
2984 /*
2985   startup the recovery daemon as a child of the main ctdb daemon
2986  */
2987 int ctdb_start_recoverd(struct ctdb_context *ctdb)
2988 {
2989         int ret;
2990         int fd[2];
2991         struct signal_event *se;
2992
2993         if (pipe(fd) != 0) {
2994                 return -1;
2995         }
2996
2997         ctdb->ctdbd_pid = getpid();
2998
2999         ctdb->recoverd_pid = fork();
3000         if (ctdb->recoverd_pid == -1) {
3001                 return -1;
3002         }
3003         
3004         if (ctdb->recoverd_pid != 0) {
3005                 close(fd[0]);
3006                 event_add_timed(ctdb->ev, ctdb, 
3007                                 timeval_current_ofs(30, 0),
3008                                 ctdb_check_recd, ctdb);
3009                 return 0;
3010         }
3011
3012         close(fd[1]);
3013
3014         /* shutdown the transport */
3015         if (ctdb->methods) {
3016                 ctdb->methods->shutdown(ctdb);
3017         }
3018
3019         /* get a new event context */
3020         talloc_free(ctdb->ev);
3021         ctdb->ev = event_context_init(ctdb);
3022
3023         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3024                      ctdb_recoverd_parent, &fd[0]);     
3025
3026         close(ctdb->daemon.sd);
3027         ctdb->daemon.sd = -1;
3028
3029         srandom(getpid() ^ time(NULL));
3030
3031         /* the recovery daemon does not need to be realtime */
3032         if (ctdb->do_setsched) {
3033                 ctdb_restore_scheduler(ctdb);
3034         }
3035
3036         /* initialise ctdb */
3037         ret = ctdb_socket_connect(ctdb);
3038         if (ret != 0) {
3039                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb\n"));
3040                 exit(1);
3041         }
3042
3043         /* set up a handler to pick up sigchld */
3044         se = event_add_signal(ctdb->ev, ctdb,
3045                                      SIGCHLD, 0,
3046                                      recd_sig_child_handler,
3047                                      ctdb);
3048         if (se == NULL) {
3049                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3050                 exit(1);
3051         }
3052
3053         monitor_cluster(ctdb);
3054
3055         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3056         return -1;
3057 }
3058
3059 /*
3060   shutdown the recovery daemon
3061  */
3062 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3063 {
3064         if (ctdb->recoverd_pid == 0) {
3065                 return;
3066         }
3067
3068         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3069         kill(ctdb->recoverd_pid, SIGTERM);
3070 }