recoverd: Always do an early exit from main_loop if node is stopped or banned
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         if (!ctdb_validate_pnn(ctdb, pnn)) {
90                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
91                 return;
92         }
93
94         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         /* If we are banned or stopped, do not set other nodes as culprits */
124         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
125                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
126                 return;
127         }
128
129         if (ctdb->nodes[culprit]->ban_state == NULL) {
130                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
131                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
132
133                 
134         }
135         ban_state = ctdb->nodes[culprit]->ban_state;
136         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
137                 /* this was the first time in a long while this node
138                    misbehaved so we will forgive any old transgressions.
139                 */
140                 ban_state->count = 0;
141         }
142
143         ban_state->count += count;
144         ban_state->last_reported_time = timeval_current();
145         rec->last_culprit_node = culprit;
146 }
147
148 /*
149   remember the trouble maker
150  */
151 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
152 {
153         ctdb_set_culprit_count(rec, culprit, 1);
154 }
155
156
157 /* this callback is called for every node that failed to execute the
158    recovered event
159 */
160 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
161 {
162         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
163
164         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
165
166         ctdb_set_culprit(rec, node_pnn);
167 }
168
169 /*
170   run the "recovered" eventscript on all nodes
171  */
172 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
173 {
174         TALLOC_CTX *tmp_ctx;
175         uint32_t *nodes;
176         struct ctdb_context *ctdb = rec->ctdb;
177
178         tmp_ctx = talloc_new(ctdb);
179         CTDB_NO_MEMORY(ctdb, tmp_ctx);
180
181         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
182         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
183                                         nodes, 0,
184                                         CONTROL_TIMEOUT(), false, tdb_null,
185                                         NULL, recovered_fail_callback,
186                                         rec) != 0) {
187                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
188
189                 talloc_free(tmp_ctx);
190                 return -1;
191         }
192
193         talloc_free(tmp_ctx);
194         return 0;
195 }
196
197 /* this callback is called for every node that failed to execute the
198    start recovery event
199 */
200 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
201 {
202         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
203
204         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
205
206         ctdb_set_culprit(rec, node_pnn);
207 }
208
209 /*
210   run the "startrecovery" eventscript on all nodes
211  */
212 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
213 {
214         TALLOC_CTX *tmp_ctx;
215         uint32_t *nodes;
216         struct ctdb_context *ctdb = rec->ctdb;
217
218         tmp_ctx = talloc_new(ctdb);
219         CTDB_NO_MEMORY(ctdb, tmp_ctx);
220
221         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
222         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
223                                         nodes, 0,
224                                         CONTROL_TIMEOUT(), false, tdb_null,
225                                         NULL,
226                                         startrecovery_fail_callback,
227                                         rec) != 0) {
228                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
229                 talloc_free(tmp_ctx);
230                 return -1;
231         }
232
233         talloc_free(tmp_ctx);
234         return 0;
235 }
236
237 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
238 {
239         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
240                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
241                 return;
242         }
243         if (node_pnn < ctdb->num_nodes) {
244                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
245         }
246
247         if (node_pnn == ctdb->pnn) {
248                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
249         }
250 }
251
252 /*
253   update the node capabilities for all connected nodes
254  */
255 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
256 {
257         uint32_t *nodes;
258         TALLOC_CTX *tmp_ctx;
259
260         tmp_ctx = talloc_new(ctdb);
261         CTDB_NO_MEMORY(ctdb, tmp_ctx);
262
263         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
264         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
265                                         nodes, 0,
266                                         CONTROL_TIMEOUT(),
267                                         false, tdb_null,
268                                         async_getcap_callback, NULL,
269                                         NULL) != 0) {
270                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
271                 talloc_free(tmp_ctx);
272                 return -1;
273         }
274
275         talloc_free(tmp_ctx);
276         return 0;
277 }
278
279 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
280 {
281         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
282
283         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
284         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
285 }
286
287 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
288 {
289         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
290
291         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
292         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
293 }
294
295 /*
296   change recovery mode on all nodes
297  */
298 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
299 {
300         TDB_DATA data;
301         uint32_t *nodes;
302         TALLOC_CTX *tmp_ctx;
303
304         tmp_ctx = talloc_new(ctdb);
305         CTDB_NO_MEMORY(ctdb, tmp_ctx);
306
307         /* freeze all nodes */
308         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
309         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
310                 int i;
311
312                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
313                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
314                                                 nodes, i,
315                                                 CONTROL_TIMEOUT(),
316                                                 false, tdb_null,
317                                                 NULL,
318                                                 set_recmode_fail_callback,
319                                                 rec) != 0) {
320                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
321                                 talloc_free(tmp_ctx);
322                                 return -1;
323                         }
324                 }
325         }
326
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&rec_mode;
330
331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
332                                         nodes, 0,
333                                         CONTROL_TIMEOUT(),
334                                         false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /*
347   change recovery master on all node
348  */
349 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
350 {
351         TDB_DATA data;
352         TALLOC_CTX *tmp_ctx;
353         uint32_t *nodes;
354
355         tmp_ctx = talloc_new(ctdb);
356         CTDB_NO_MEMORY(ctdb, tmp_ctx);
357
358         data.dsize = sizeof(uint32_t);
359         data.dptr = (unsigned char *)&pnn;
360
361         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
362         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
363                                         nodes, 0,
364                                         CONTROL_TIMEOUT(), false, data,
365                                         NULL, NULL,
366                                         NULL) != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
368                 talloc_free(tmp_ctx);
369                 return -1;
370         }
371
372         talloc_free(tmp_ctx);
373         return 0;
374 }
375
376 /* update all remote nodes to use the same db priority that we have
377    this can fail if the remove node has not yet been upgraded to 
378    support this function, so we always return success and never fail
379    a recovery if this call fails.
380 */
381 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
382         struct ctdb_node_map *nodemap, 
383         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
384 {
385         int db;
386         uint32_t *nodes;
387
388         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
389
390         /* step through all local databases */
391         for (db=0; db<dbmap->num;db++) {
392                 TDB_DATA data;
393                 struct ctdb_db_priority db_prio;
394                 int ret;
395
396                 db_prio.db_id     = dbmap->dbs[db].dbid;
397                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
398                 if (ret != 0) {
399                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
400                         continue;
401                 }
402
403                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
404
405                 data.dptr  = (uint8_t *)&db_prio;
406                 data.dsize = sizeof(db_prio);
407
408                 if (ctdb_client_async_control(ctdb,
409                                         CTDB_CONTROL_SET_DB_PRIORITY,
410                                         nodes, 0,
411                                         CONTROL_TIMEOUT(), false, data,
412                                         NULL, NULL,
413                                         NULL) != 0) {
414                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
415                 }
416         }
417
418         return 0;
419 }                       
420
421 /*
422   ensure all other nodes have attached to any databases that we have
423  */
424 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
425                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
426 {
427         int i, j, db, ret;
428         struct ctdb_dbid_map *remote_dbmap;
429
430         /* verify that all other nodes have all our databases */
431         for (j=0; j<nodemap->num; j++) {
432                 /* we dont need to ourself ourselves */
433                 if (nodemap->nodes[j].pnn == pnn) {
434                         continue;
435                 }
436                 /* dont check nodes that are unavailable */
437                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
438                         continue;
439                 }
440
441                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                          mem_ctx, &remote_dbmap);
443                 if (ret != 0) {
444                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
445                         return -1;
446                 }
447
448                 /* step through all local databases */
449                 for (db=0; db<dbmap->num;db++) {
450                         const char *name;
451
452
453                         for (i=0;i<remote_dbmap->num;i++) {
454                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
455                                         break;
456                                 }
457                         }
458                         /* the remote node already have this database */
459                         if (i!=remote_dbmap->num) {
460                                 continue;
461                         }
462                         /* ok so we need to create this database */
463                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
464                                             mem_ctx, &name);
465                         if (ret != 0) {
466                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
467                                 return -1;
468                         }
469                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
470                                            mem_ctx, name,
471                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
474                                 return -1;
475                         }
476                 }
477         }
478
479         return 0;
480 }
481
482
483 /*
484   ensure we are attached to any databases that anyone else is attached to
485  */
486 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
487                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
488 {
489         int i, j, db, ret;
490         struct ctdb_dbid_map *remote_dbmap;
491
492         /* verify that we have all database any other node has */
493         for (j=0; j<nodemap->num; j++) {
494                 /* we dont need to ourself ourselves */
495                 if (nodemap->nodes[j].pnn == pnn) {
496                         continue;
497                 }
498                 /* dont check nodes that are unavailable */
499                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
500                         continue;
501                 }
502
503                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
504                                          mem_ctx, &remote_dbmap);
505                 if (ret != 0) {
506                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
507                         return -1;
508                 }
509
510                 /* step through all databases on the remote node */
511                 for (db=0; db<remote_dbmap->num;db++) {
512                         const char *name;
513
514                         for (i=0;i<(*dbmap)->num;i++) {
515                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
516                                         break;
517                                 }
518                         }
519                         /* we already have this db locally */
520                         if (i!=(*dbmap)->num) {
521                                 continue;
522                         }
523                         /* ok so we need to create this database and
524                            rebuild dbmap
525                          */
526                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
527                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
528                         if (ret != 0) {
529                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
530                                           nodemap->nodes[j].pnn));
531                                 return -1;
532                         }
533                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
534                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
535                         if (ret != 0) {
536                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
537                                 return -1;
538                         }
539                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   pull the remote database contents from one node into the recdb
553  */
554 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
555                                     struct tdb_wrap *recdb, uint32_t dbid)
556 {
557         int ret;
558         TDB_DATA outdata;
559         struct ctdb_marshall_buffer *reply;
560         struct ctdb_rec_data *rec;
561         int i;
562         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
563
564         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
565                                CONTROL_TIMEOUT(), &outdata);
566         if (ret != 0) {
567                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
568                 talloc_free(tmp_ctx);
569                 return -1;
570         }
571
572         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
573
574         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
575                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
576                 talloc_free(tmp_ctx);
577                 return -1;
578         }
579         
580         rec = (struct ctdb_rec_data *)&reply->data[0];
581         
582         for (i=0;
583              i<reply->count;
584              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
585                 TDB_DATA key, data;
586                 struct ctdb_ltdb_header *hdr;
587                 TDB_DATA existing;
588                 
589                 key.dptr = &rec->data[0];
590                 key.dsize = rec->keylen;
591                 data.dptr = &rec->data[key.dsize];
592                 data.dsize = rec->datalen;
593                 
594                 hdr = (struct ctdb_ltdb_header *)data.dptr;
595
596                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
597                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
598                         talloc_free(tmp_ctx);
599                         return -1;
600                 }
601
602                 /* fetch the existing record, if any */
603                 existing = tdb_fetch(recdb->tdb, key);
604                 
605                 if (existing.dptr != NULL) {
606                         struct ctdb_ltdb_header header;
607                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
608                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
609                                          (unsigned)existing.dsize, srcnode));
610                                 free(existing.dptr);
611                                 talloc_free(tmp_ctx);
612                                 return -1;
613                         }
614                         header = *(struct ctdb_ltdb_header *)existing.dptr;
615                         free(existing.dptr);
616                         if (!(header.rsn < hdr->rsn ||
617                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
618                                 continue;
619                         }
620                 }
621                 
622                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
623                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
624                         talloc_free(tmp_ctx);
625                         return -1;                              
626                 }
627         }
628
629         talloc_free(tmp_ctx);
630
631         return 0;
632 }
633
634
635 struct pull_seqnum_cbdata {
636         int failed;
637         uint32_t pnn;
638         uint64_t seqnum;
639 };
640
641 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
642 {
643         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
644         uint64_t seqnum;
645
646         if (cb_data->failed != 0) {
647                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
648                 return;
649         }
650
651         if (res != 0) {
652                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
653                 cb_data->failed = 1;
654                 return;
655         }
656
657         if (outdata.dsize != sizeof(uint64_t)) {
658                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
659                 cb_data->failed = -1;
660                 return;
661         }
662
663         seqnum = *((uint64_t *)outdata.dptr);
664
665         if (seqnum > cb_data->seqnum) {
666                 cb_data->seqnum = seqnum;
667                 cb_data->pnn = node_pnn;
668         }
669 }
670
671 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
672 {
673         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
674
675         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
676         cb_data->failed = 1;
677 }
678
679 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
680                                 struct ctdb_recoverd *rec, 
681                                 struct ctdb_node_map *nodemap, 
682                                 struct tdb_wrap *recdb, uint32_t dbid)
683 {
684         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
685         uint32_t *nodes;
686         TDB_DATA data;
687         uint32_t outdata[2];
688         struct pull_seqnum_cbdata *cb_data;
689
690         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
691
692         outdata[0] = dbid;
693         outdata[1] = 0;
694
695         data.dsize = sizeof(outdata);
696         data.dptr  = (uint8_t *)&outdata[0];
697
698         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
699         if (cb_data == NULL) {
700                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
701                 talloc_free(tmp_ctx);
702                 return -1;
703         }
704
705         cb_data->failed = 0;
706         cb_data->pnn    = -1;
707         cb_data->seqnum = 0;
708         
709         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
711                                         nodes, 0,
712                                         CONTROL_TIMEOUT(), false, data,
713                                         pull_seqnum_cb,
714                                         pull_seqnum_fail_cb,
715                                         cb_data) != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
717
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->failed != 0) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
729                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
730                 talloc_free(tmp_ctx);
731                 return -1;
732         }
733
734         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
735
736         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
737                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
738                 talloc_free(tmp_ctx);
739                 return -1;
740         }
741
742         talloc_free(tmp_ctx);
743         return 0;
744 }
745
746
747 /*
748   pull all the remote database contents into the recdb
749  */
750 static int pull_remote_database(struct ctdb_context *ctdb,
751                                 struct ctdb_recoverd *rec, 
752                                 struct ctdb_node_map *nodemap, 
753                                 struct tdb_wrap *recdb, uint32_t dbid,
754                                 bool persistent)
755 {
756         int j;
757
758         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
759                 int ret;
760                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
761                 if (ret == 0) {
762                         return 0;
763                 }
764         }
765
766         /* pull all records from all other nodes across onto this node
767            (this merges based on rsn)
768         */
769         for (j=0; j<nodemap->num; j++) {
770                 /* dont merge from nodes that are unavailable */
771                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
772                         continue;
773                 }
774                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
775                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
776                                  nodemap->nodes[j].pnn));
777                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
778                         return -1;
779                 }
780         }
781         
782         return 0;
783 }
784
785
786 /*
787   update flags on all active nodes
788  */
789 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
790 {
791         int ret;
792
793         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
794                 if (ret != 0) {
795                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
796                 return -1;
797         }
798
799         return 0;
800 }
801
802 /*
803   ensure all nodes have the same vnnmap we do
804  */
805 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
806                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
807 {
808         int j, ret;
809
810         /* push the new vnn map out to all the nodes */
811         for (j=0; j<nodemap->num; j++) {
812                 /* dont push to nodes that are unavailable */
813                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
814                         continue;
815                 }
816
817                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
818                 if (ret != 0) {
819                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
820                         return -1;
821                 }
822         }
823
824         return 0;
825 }
826
827
828 struct vacuum_info {
829         struct vacuum_info *next, *prev;
830         struct ctdb_recoverd *rec;
831         uint32_t srcnode;
832         struct ctdb_db_context *ctdb_db;
833         struct ctdb_marshall_buffer *recs;
834         struct ctdb_rec_data *r;
835 };
836
837 static void vacuum_fetch_next(struct vacuum_info *v);
838
839 /*
840   called when a vacuum fetch has completed - just free it and do the next one
841  */
842 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
843 {
844         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
845         talloc_free(state);
846         vacuum_fetch_next(v);
847 }
848
849
850 /*
851   process the next element from the vacuum list
852 */
853 static void vacuum_fetch_next(struct vacuum_info *v)
854 {
855         struct ctdb_call call;
856         struct ctdb_rec_data *r;
857
858         while (v->recs->count) {
859                 struct ctdb_client_call_state *state;
860                 TDB_DATA data;
861                 struct ctdb_ltdb_header *hdr;
862
863                 ZERO_STRUCT(call);
864                 call.call_id = CTDB_NULL_FUNC;
865                 call.flags = CTDB_IMMEDIATE_MIGRATION;
866                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
867
868                 r = v->r;
869                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
870                 v->recs->count--;
871
872                 call.key.dptr = &r->data[0];
873                 call.key.dsize = r->keylen;
874
875                 /* ensure we don't block this daemon - just skip a record if we can't get
876                    the chainlock */
877                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
878                         continue;
879                 }
880
881                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
882                 if (data.dptr == NULL) {
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886
887                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
888                         free(data.dptr);
889                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
890                         continue;
891                 }
892                 
893                 hdr = (struct ctdb_ltdb_header *)data.dptr;
894                 if (hdr->dmaster == v->rec->ctdb->pnn) {
895                         /* its already local */
896                         free(data.dptr);
897                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
898                         continue;
899                 }
900
901                 free(data.dptr);
902
903                 state = ctdb_call_send(v->ctdb_db, &call);
904                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
905                 if (state == NULL) {
906                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
907                         talloc_free(v);
908                         return;
909                 }
910                 state->async.fn = vacuum_fetch_callback;
911                 state->async.private_data = v;
912                 return;
913         }
914
915         talloc_free(v);
916 }
917
918
919 /*
920   destroy a vacuum info structure
921  */
922 static int vacuum_info_destructor(struct vacuum_info *v)
923 {
924         DLIST_REMOVE(v->rec->vacuum_info, v);
925         return 0;
926 }
927
928
929 /*
930   handler for vacuum fetch
931 */
932 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
933                                  TDB_DATA data, void *private_data)
934 {
935         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
936         struct ctdb_marshall_buffer *recs;
937         int ret, i;
938         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
939         const char *name;
940         struct ctdb_dbid_map *dbmap=NULL;
941         bool persistent = false;
942         struct ctdb_db_context *ctdb_db;
943         struct ctdb_rec_data *r;
944         uint32_t srcnode;
945         struct vacuum_info *v;
946
947         recs = (struct ctdb_marshall_buffer *)data.dptr;
948         r = (struct ctdb_rec_data *)&recs->data[0];
949
950         if (recs->count == 0) {
951                 talloc_free(tmp_ctx);
952                 return;
953         }
954
955         srcnode = r->reqid;
956
957         for (v=rec->vacuum_info;v;v=v->next) {
958                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
959                         /* we're already working on records from this node */
960                         talloc_free(tmp_ctx);
961                         return;
962                 }
963         }
964
965         /* work out if the database is persistent */
966         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
967         if (ret != 0) {
968                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         for (i=0;i<dbmap->num;i++) {
974                 if (dbmap->dbs[i].dbid == recs->db_id) {
975                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
976                         break;
977                 }
978         }
979         if (i == dbmap->num) {
980                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
981                 talloc_free(tmp_ctx);
982                 return;         
983         }
984
985         /* find the name of this database */
986         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
987                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
988                 talloc_free(tmp_ctx);
989                 return;
990         }
991
992         /* attach to it */
993         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
994         if (ctdb_db == NULL) {
995                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
996                 talloc_free(tmp_ctx);
997                 return;
998         }
999
1000         v = talloc_zero(rec, struct vacuum_info);
1001         if (v == NULL) {
1002                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1003                 talloc_free(tmp_ctx);
1004                 return;
1005         }
1006
1007         v->rec = rec;
1008         v->srcnode = srcnode;
1009         v->ctdb_db = ctdb_db;
1010         v->recs = talloc_memdup(v, recs, data.dsize);
1011         if (v->recs == NULL) {
1012                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1013                 talloc_free(v);
1014                 talloc_free(tmp_ctx);
1015                 return;         
1016         }
1017         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1018
1019         DLIST_ADD(rec->vacuum_info, v);
1020
1021         talloc_set_destructor(v, vacuum_info_destructor);
1022
1023         vacuum_fetch_next(v);
1024         talloc_free(tmp_ctx);
1025 }
1026
1027
1028 /*
1029   called when ctdb_wait_timeout should finish
1030  */
1031 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1032                               struct timeval yt, void *p)
1033 {
1034         uint32_t *timed_out = (uint32_t *)p;
1035         (*timed_out) = 1;
1036 }
1037
1038 /*
1039   wait for a given number of seconds
1040  */
1041 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1042 {
1043         uint32_t timed_out = 0;
1044         time_t usecs = (secs - (time_t)secs) * 1000000;
1045         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1046         while (!timed_out) {
1047                 event_loop_once(ctdb->ev);
1048         }
1049 }
1050
1051 /*
1052   called when an election times out (ends)
1053  */
1054 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1055                                   struct timeval t, void *p)
1056 {
1057         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1058         rec->election_timeout = NULL;
1059         fast_start = false;
1060
1061         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1062 }
1063
1064
1065 /*
1066   wait for an election to finish. It finished election_timeout seconds after
1067   the last election packet is received
1068  */
1069 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1070 {
1071         struct ctdb_context *ctdb = rec->ctdb;
1072         while (rec->election_timeout) {
1073                 event_loop_once(ctdb->ev);
1074         }
1075 }
1076
1077 /*
1078   Update our local flags from all remote connected nodes. 
1079   This is only run when we are or we belive we are the recovery master
1080  */
1081 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1082 {
1083         int j;
1084         struct ctdb_context *ctdb = rec->ctdb;
1085         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1086
1087         /* get the nodemap for all active remote nodes and verify
1088            they are the same as for this node
1089          */
1090         for (j=0; j<nodemap->num; j++) {
1091                 struct ctdb_node_map *remote_nodemap=NULL;
1092                 int ret;
1093
1094                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1095                         continue;
1096                 }
1097                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1098                         continue;
1099                 }
1100
1101                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1102                                            mem_ctx, &remote_nodemap);
1103                 if (ret != 0) {
1104                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1105                                   nodemap->nodes[j].pnn));
1106                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1107                         talloc_free(mem_ctx);
1108                         return MONITOR_FAILED;
1109                 }
1110                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1111                         /* We should tell our daemon about this so it
1112                            updates its flags or else we will log the same 
1113                            message again in the next iteration of recovery.
1114                            Since we are the recovery master we can just as
1115                            well update the flags on all nodes.
1116                         */
1117                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1118                         if (ret != 0) {
1119                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1120                                 return -1;
1121                         }
1122
1123                         /* Update our local copy of the flags in the recovery
1124                            daemon.
1125                         */
1126                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1127                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1128                                  nodemap->nodes[j].flags));
1129                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1130                 }
1131                 talloc_free(remote_nodemap);
1132         }
1133         talloc_free(mem_ctx);
1134         return MONITOR_OK;
1135 }
1136
1137
1138 /* Create a new random generation ip. 
1139    The generation id can not be the INVALID_GENERATION id
1140 */
1141 static uint32_t new_generation(void)
1142 {
1143         uint32_t generation;
1144
1145         while (1) {
1146                 generation = random();
1147
1148                 if (generation != INVALID_GENERATION) {
1149                         break;
1150                 }
1151         }
1152
1153         return generation;
1154 }
1155
1156
1157 /*
1158   create a temporary working database
1159  */
1160 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1161 {
1162         char *name;
1163         struct tdb_wrap *recdb;
1164         unsigned tdb_flags;
1165
1166         /* open up the temporary recovery database */
1167         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1168                                ctdb->db_directory_state,
1169                                ctdb->pnn);
1170         if (name == NULL) {
1171                 return NULL;
1172         }
1173         unlink(name);
1174
1175         tdb_flags = TDB_NOLOCK;
1176         if (ctdb->valgrinding) {
1177                 tdb_flags |= TDB_NOMMAP;
1178         }
1179         tdb_flags |= TDB_DISALLOW_NESTING;
1180
1181         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1182                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1183         if (recdb == NULL) {
1184                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1185         }
1186
1187         talloc_free(name);
1188
1189         return recdb;
1190 }
1191
1192
1193 /* 
1194    a traverse function for pulling all relevant records from recdb
1195  */
1196 struct recdb_data {
1197         struct ctdb_context *ctdb;
1198         struct ctdb_marshall_buffer *recdata;
1199         uint32_t len;
1200         uint32_t allocated_len;
1201         bool failed;
1202         bool persistent;
1203 };
1204
1205 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1206 {
1207         struct recdb_data *params = (struct recdb_data *)p;
1208         struct ctdb_rec_data *rec;
1209         struct ctdb_ltdb_header *hdr;
1210
1211         /*
1212          * skip empty records - but NOT for persistent databases:
1213          *
1214          * The record-by-record mode of recovery deletes empty records.
1215          * For persistent databases, this can lead to data corruption
1216          * by deleting records that should be there:
1217          *
1218          * - Assume the cluster has been running for a while.
1219          *
1220          * - A record R in a persistent database has been created and
1221          *   deleted a couple of times, the last operation being deletion,
1222          *   leaving an empty record with a high RSN, say 10.
1223          *
1224          * - Now a node N is turned off.
1225          *
1226          * - This leaves the local database copy of D on N with the empty
1227          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1228          *   the copy of record R.
1229          *
1230          * - Now the record is created again while node N is turned off.
1231          *   This creates R with RSN = 1 on all nodes except for N.
1232          *
1233          * - Now node N is turned on again. The following recovery will chose
1234          *   the older empty copy of R due to RSN 10 > RSN 1.
1235          *
1236          * ==> Hence the record is gone after the recovery.
1237          *
1238          * On databases like Samba's registry, this can damage the higher-level
1239          * data structures built from the various tdb-level records.
1240          */
1241         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1242                 return 0;
1243         }
1244
1245         /* update the dmaster field to point to us */
1246         hdr = (struct ctdb_ltdb_header *)data.dptr;
1247         if (!params->persistent) {
1248                 hdr->dmaster = params->ctdb->pnn;
1249                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1250         }
1251
1252         /* add the record to the blob ready to send to the nodes */
1253         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1254         if (rec == NULL) {
1255                 params->failed = true;
1256                 return -1;
1257         }
1258         if (params->len + rec->length >= params->allocated_len) {
1259                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1260                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1261         }
1262         if (params->recdata == NULL) {
1263                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1264                          rec->length + params->len, params->recdata->count));
1265                 params->failed = true;
1266                 return -1;
1267         }
1268         params->recdata->count++;
1269         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1270         params->len += rec->length;
1271         talloc_free(rec);
1272
1273         return 0;
1274 }
1275
1276 /*
1277   push the recdb database out to all nodes
1278  */
1279 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1280                                bool persistent,
1281                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1282 {
1283         struct recdb_data params;
1284         struct ctdb_marshall_buffer *recdata;
1285         TDB_DATA outdata;
1286         TALLOC_CTX *tmp_ctx;
1287         uint32_t *nodes;
1288
1289         tmp_ctx = talloc_new(ctdb);
1290         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1291
1292         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1293         CTDB_NO_MEMORY(ctdb, recdata);
1294
1295         recdata->db_id = dbid;
1296
1297         params.ctdb = ctdb;
1298         params.recdata = recdata;
1299         params.len = offsetof(struct ctdb_marshall_buffer, data);
1300         params.allocated_len = params.len;
1301         params.failed = false;
1302         params.persistent = persistent;
1303
1304         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1305                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1306                 talloc_free(params.recdata);
1307                 talloc_free(tmp_ctx);
1308                 return -1;
1309         }
1310
1311         if (params.failed) {
1312                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1313                 talloc_free(params.recdata);
1314                 talloc_free(tmp_ctx);
1315                 return -1;              
1316         }
1317
1318         recdata = params.recdata;
1319
1320         outdata.dptr = (void *)recdata;
1321         outdata.dsize = params.len;
1322
1323         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1324         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1325                                         nodes, 0,
1326                                         CONTROL_TIMEOUT(), false, outdata,
1327                                         NULL, NULL,
1328                                         NULL) != 0) {
1329                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1330                 talloc_free(recdata);
1331                 talloc_free(tmp_ctx);
1332                 return -1;
1333         }
1334
1335         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1336                   dbid, recdata->count));
1337
1338         talloc_free(recdata);
1339         talloc_free(tmp_ctx);
1340
1341         return 0;
1342 }
1343
1344
1345 /*
1346   go through a full recovery on one database 
1347  */
1348 static int recover_database(struct ctdb_recoverd *rec, 
1349                             TALLOC_CTX *mem_ctx,
1350                             uint32_t dbid,
1351                             bool persistent,
1352                             uint32_t pnn, 
1353                             struct ctdb_node_map *nodemap,
1354                             uint32_t transaction_id)
1355 {
1356         struct tdb_wrap *recdb;
1357         int ret;
1358         struct ctdb_context *ctdb = rec->ctdb;
1359         TDB_DATA data;
1360         struct ctdb_control_wipe_database w;
1361         uint32_t *nodes;
1362
1363         recdb = create_recdb(ctdb, mem_ctx);
1364         if (recdb == NULL) {
1365                 return -1;
1366         }
1367
1368         /* pull all remote databases onto the recdb */
1369         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1370         if (ret != 0) {
1371                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1372                 return -1;
1373         }
1374
1375         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1376
1377         /* wipe all the remote databases. This is safe as we are in a transaction */
1378         w.db_id = dbid;
1379         w.transaction_id = transaction_id;
1380
1381         data.dptr = (void *)&w;
1382         data.dsize = sizeof(w);
1383
1384         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1386                                         nodes, 0,
1387                                         CONTROL_TIMEOUT(), false, data,
1388                                         NULL, NULL,
1389                                         NULL) != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1391                 talloc_free(recdb);
1392                 return -1;
1393         }
1394         
1395         /* push out the correct database. This sets the dmaster and skips 
1396            the empty records */
1397         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1398         if (ret != 0) {
1399                 talloc_free(recdb);
1400                 return -1;
1401         }
1402
1403         /* all done with this database */
1404         talloc_free(recdb);
1405
1406         return 0;
1407 }
1408
1409 /*
1410   reload the nodes file 
1411 */
1412 static void reload_nodes_file(struct ctdb_context *ctdb)
1413 {
1414         ctdb->nodes = NULL;
1415         ctdb_load_nodes_file(ctdb);
1416 }
1417
1418 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1419                                          struct ctdb_recoverd *rec,
1420                                          struct ctdb_node_map *nodemap,
1421                                          uint32_t *culprit)
1422 {
1423         int j;
1424         int ret;
1425
1426         if (ctdb->num_nodes != nodemap->num) {
1427                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1428                                   ctdb->num_nodes, nodemap->num));
1429                 if (culprit) {
1430                         *culprit = ctdb->pnn;
1431                 }
1432                 return -1;
1433         }
1434
1435         for (j=0; j<nodemap->num; j++) {
1436                 /* release any existing data */
1437                 if (ctdb->nodes[j]->known_public_ips) {
1438                         talloc_free(ctdb->nodes[j]->known_public_ips);
1439                         ctdb->nodes[j]->known_public_ips = NULL;
1440                 }
1441                 if (ctdb->nodes[j]->available_public_ips) {
1442                         talloc_free(ctdb->nodes[j]->available_public_ips);
1443                         ctdb->nodes[j]->available_public_ips = NULL;
1444                 }
1445
1446                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1447                         continue;
1448                 }
1449
1450                 /* grab a new shiny list of public ips from the node */
1451                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1452                                         CONTROL_TIMEOUT(),
1453                                         ctdb->nodes[j]->pnn,
1454                                         ctdb->nodes,
1455                                         0,
1456                                         &ctdb->nodes[j]->known_public_ips);
1457                 if (ret != 0) {
1458                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1459                                 ctdb->nodes[j]->pnn));
1460                         if (culprit) {
1461                                 *culprit = ctdb->nodes[j]->pnn;
1462                         }
1463                         return -1;
1464                 }
1465
1466                 if (ctdb->do_checkpublicip) {
1467                         if (rec->ip_check_disable_ctx == NULL) {
1468                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1469                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1470                                         rec->need_takeover_run = true;
1471                                 }
1472                         }
1473                 }
1474
1475                 /* grab a new shiny list of public ips from the node */
1476                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1477                                         CONTROL_TIMEOUT(),
1478                                         ctdb->nodes[j]->pnn,
1479                                         ctdb->nodes,
1480                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1481                                         &ctdb->nodes[j]->available_public_ips);
1482                 if (ret != 0) {
1483                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1484                                 ctdb->nodes[j]->pnn));
1485                         if (culprit) {
1486                                 *culprit = ctdb->nodes[j]->pnn;
1487                         }
1488                         return -1;
1489                 }
1490         }
1491
1492         return 0;
1493 }
1494
1495 /* when we start a recovery, make sure all nodes use the same reclock file
1496    setting
1497 */
1498 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1499 {
1500         struct ctdb_context *ctdb = rec->ctdb;
1501         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1502         TDB_DATA data;
1503         uint32_t *nodes;
1504
1505         if (ctdb->recovery_lock_file == NULL) {
1506                 data.dptr  = NULL;
1507                 data.dsize = 0;
1508         } else {
1509                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1510                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1511         }
1512
1513         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1514         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1515                                         nodes, 0,
1516                                         CONTROL_TIMEOUT(),
1517                                         false, data,
1518                                         NULL, NULL,
1519                                         rec) != 0) {
1520                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1521                 talloc_free(tmp_ctx);
1522                 return -1;
1523         }
1524
1525         talloc_free(tmp_ctx);
1526         return 0;
1527 }
1528
1529
1530 /*
1531  * this callback is called for every node that failed to execute ctdb_takeover_run()
1532  * and set flag to re-run takeover run.
1533  */
1534 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1535 {
1536         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1537
1538         if (callback_data != NULL) {
1539                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1540
1541                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1542
1543                 ctdb_set_culprit(rec, node_pnn);
1544                 rec->need_takeover_run = true;
1545         }
1546 }
1547
1548
1549 /*
1550   we are the recmaster, and recovery is needed - start a recovery run
1551  */
1552 static int do_recovery(struct ctdb_recoverd *rec, 
1553                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1554                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1555 {
1556         struct ctdb_context *ctdb = rec->ctdb;
1557         int i, j, ret;
1558         uint32_t generation;
1559         struct ctdb_dbid_map *dbmap;
1560         TDB_DATA data;
1561         uint32_t *nodes;
1562         struct timeval start_time;
1563         uint32_t culprit = (uint32_t)-1;
1564
1565         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1566
1567         /* if recovery fails, force it again */
1568         rec->need_recovery = true;
1569
1570         for (i=0; i<ctdb->num_nodes; i++) {
1571                 struct ctdb_banning_state *ban_state;
1572
1573                 if (ctdb->nodes[i]->ban_state == NULL) {
1574                         continue;
1575                 }
1576                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1577                 if (ban_state->count < 2*ctdb->num_nodes) {
1578                         continue;
1579                 }
1580                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1581                         ctdb->nodes[i]->pnn, ban_state->count,
1582                         ctdb->tunable.recovery_ban_period));
1583                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1584                 ban_state->count = 0;
1585         }
1586
1587
1588         if (ctdb->tunable.verify_recovery_lock != 0) {
1589                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1590                 start_time = timeval_current();
1591                 if (!ctdb_recovery_lock(ctdb, true)) {
1592                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1593                                          "and ban ourself for %u seconds\n",
1594                                          ctdb->tunable.recovery_ban_period));
1595                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1596                         return -1;
1597                 }
1598                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1599                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1600         }
1601
1602         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1603
1604         /* get a list of all databases */
1605         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1606         if (ret != 0) {
1607                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1608                 return -1;
1609         }
1610
1611         /* we do the db creation before we set the recovery mode, so the freeze happens
1612            on all databases we will be dealing with. */
1613
1614         /* verify that we have all the databases any other node has */
1615         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1616         if (ret != 0) {
1617                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1618                 return -1;
1619         }
1620
1621         /* verify that all other nodes have all our databases */
1622         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1623         if (ret != 0) {
1624                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1625                 return -1;
1626         }
1627         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1628
1629         /* update the database priority for all remote databases */
1630         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1631         if (ret != 0) {
1632                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1633         }
1634         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1635
1636
1637         /* update all other nodes to use the same setting for reclock files
1638            as the local recovery master.
1639         */
1640         sync_recovery_lock_file_across_cluster(rec);
1641
1642         /* set recovery mode to active on all nodes */
1643         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1644         if (ret != 0) {
1645                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1646                 return -1;
1647         }
1648
1649         /* execute the "startrecovery" event script on all nodes */
1650         ret = run_startrecovery_eventscript(rec, nodemap);
1651         if (ret!=0) {
1652                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1653                 return -1;
1654         }
1655
1656         /*
1657           update all nodes to have the same flags that we have
1658          */
1659         for (i=0;i<nodemap->num;i++) {
1660                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1661                         continue;
1662                 }
1663
1664                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1665                 if (ret != 0) {
1666                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1667                         return -1;
1668                 }
1669         }
1670
1671         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1672
1673         /* pick a new generation number */
1674         generation = new_generation();
1675
1676         /* change the vnnmap on this node to use the new generation 
1677            number but not on any other nodes.
1678            this guarantees that if we abort the recovery prematurely
1679            for some reason (a node stops responding?)
1680            that we can just return immediately and we will reenter
1681            recovery shortly again.
1682            I.e. we deliberately leave the cluster with an inconsistent
1683            generation id to allow us to abort recovery at any stage and
1684            just restart it from scratch.
1685          */
1686         vnnmap->generation = generation;
1687         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1690                 return -1;
1691         }
1692
1693         data.dptr = (void *)&generation;
1694         data.dsize = sizeof(uint32_t);
1695
1696         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1697         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1698                                         nodes, 0,
1699                                         CONTROL_TIMEOUT(), false, data,
1700                                         NULL,
1701                                         transaction_start_fail_callback,
1702                                         rec) != 0) {
1703                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1704                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1705                                         nodes, 0,
1706                                         CONTROL_TIMEOUT(), false, tdb_null,
1707                                         NULL,
1708                                         NULL,
1709                                         NULL) != 0) {
1710                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1711                 }
1712                 return -1;
1713         }
1714
1715         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1716
1717         for (i=0;i<dbmap->num;i++) {
1718                 ret = recover_database(rec, mem_ctx,
1719                                        dbmap->dbs[i].dbid,
1720                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1721                                        pnn, nodemap, generation);
1722                 if (ret != 0) {
1723                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1724                         return -1;
1725                 }
1726         }
1727
1728         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1729
1730         /* commit all the changes */
1731         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1732                                         nodes, 0,
1733                                         CONTROL_TIMEOUT(), false, data,
1734                                         NULL, NULL,
1735                                         NULL) != 0) {
1736                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1737                 return -1;
1738         }
1739
1740         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1741         
1742
1743         /* update the capabilities for all nodes */
1744         ret = update_capabilities(ctdb, nodemap);
1745         if (ret!=0) {
1746                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1747                 return -1;
1748         }
1749
1750         /* build a new vnn map with all the currently active and
1751            unbanned nodes */
1752         generation = new_generation();
1753         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1754         CTDB_NO_MEMORY(ctdb, vnnmap);
1755         vnnmap->generation = generation;
1756         vnnmap->size = 0;
1757         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1758         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1759         for (i=j=0;i<nodemap->num;i++) {
1760                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1761                         continue;
1762                 }
1763                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1764                         /* this node can not be an lmaster */
1765                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1766                         continue;
1767                 }
1768
1769                 vnnmap->size++;
1770                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1771                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1772                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1773
1774         }
1775         if (vnnmap->size == 0) {
1776                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1777                 vnnmap->size++;
1778                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1779                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1780                 vnnmap->map[0] = pnn;
1781         }       
1782
1783         /* update to the new vnnmap on all nodes */
1784         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1785         if (ret != 0) {
1786                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1787                 return -1;
1788         }
1789
1790         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1791
1792         /* update recmaster to point to us for all nodes */
1793         ret = set_recovery_master(ctdb, nodemap, pnn);
1794         if (ret!=0) {
1795                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1796                 return -1;
1797         }
1798
1799         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1800
1801         /*
1802           update all nodes to have the same flags that we have
1803          */
1804         for (i=0;i<nodemap->num;i++) {
1805                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1806                         continue;
1807                 }
1808
1809                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1810                 if (ret != 0) {
1811                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1812                         return -1;
1813                 }
1814         }
1815
1816         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1817
1818         /* disable recovery mode */
1819         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1820         if (ret != 0) {
1821                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1822                 return -1;
1823         }
1824
1825         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1826
1827         /*
1828           tell nodes to takeover their public IPs
1829          */
1830         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1831         if (ret != 0) {
1832                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1833                                  culprit));
1834                 rec->need_takeover_run = true;
1835                 return -1;
1836         }
1837         rec->need_takeover_run = false;
1838         ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, NULL);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1841                 rec->need_takeover_run = true;
1842         }
1843
1844         /* execute the "recovered" event script on all nodes */
1845         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1846         if (ret!=0) {
1847                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1848                 return -1;
1849         }
1850
1851         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1852
1853         /* send a message to all clients telling them that the cluster 
1854            has been reconfigured */
1855         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1856
1857         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1858
1859         rec->need_recovery = false;
1860
1861         /* we managed to complete a full recovery, make sure to forgive
1862            any past sins by the nodes that could now participate in the
1863            recovery.
1864         */
1865         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1866         for (i=0;i<nodemap->num;i++) {
1867                 struct ctdb_banning_state *ban_state;
1868
1869                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1870                         continue;
1871                 }
1872
1873                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1874                 if (ban_state == NULL) {
1875                         continue;
1876                 }
1877
1878                 ban_state->count = 0;
1879         }
1880
1881
1882         /* We just finished a recovery successfully. 
1883            We now wait for rerecovery_timeout before we allow 
1884            another recovery to take place.
1885         */
1886         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1887         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1888         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1889
1890         return 0;
1891 }
1892
1893
1894 /*
1895   elections are won by first checking the number of connected nodes, then
1896   the priority time, then the pnn
1897  */
1898 struct election_message {
1899         uint32_t num_connected;
1900         struct timeval priority_time;
1901         uint32_t pnn;
1902         uint32_t node_flags;
1903 };
1904
1905 /*
1906   form this nodes election data
1907  */
1908 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1909 {
1910         int ret, i;
1911         struct ctdb_node_map *nodemap;
1912         struct ctdb_context *ctdb = rec->ctdb;
1913
1914         ZERO_STRUCTP(em);
1915
1916         em->pnn = rec->ctdb->pnn;
1917         em->priority_time = rec->priority_time;
1918
1919         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1920         if (ret != 0) {
1921                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1922                 return;
1923         }
1924
1925         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1926         em->node_flags = rec->node_flags;
1927
1928         for (i=0;i<nodemap->num;i++) {
1929                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1930                         em->num_connected++;
1931                 }
1932         }
1933
1934         /* we shouldnt try to win this election if we cant be a recmaster */
1935         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1936                 em->num_connected = 0;
1937                 em->priority_time = timeval_current();
1938         }
1939
1940         talloc_free(nodemap);
1941 }
1942
1943 /*
1944   see if the given election data wins
1945  */
1946 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1947 {
1948         struct election_message myem;
1949         int cmp = 0;
1950
1951         ctdb_election_data(rec, &myem);
1952
1953         /* we cant win if we dont have the recmaster capability */
1954         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1955                 return false;
1956         }
1957
1958         /* we cant win if we are banned */
1959         if (rec->node_flags & NODE_FLAGS_BANNED) {
1960                 return false;
1961         }       
1962
1963         /* we cant win if we are stopped */
1964         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1965                 return false;
1966         }       
1967
1968         /* we will automatically win if the other node is banned */
1969         if (em->node_flags & NODE_FLAGS_BANNED) {
1970                 return true;
1971         }
1972
1973         /* we will automatically win if the other node is banned */
1974         if (em->node_flags & NODE_FLAGS_STOPPED) {
1975                 return true;
1976         }
1977
1978         /* try to use the most connected node */
1979         if (cmp == 0) {
1980                 cmp = (int)myem.num_connected - (int)em->num_connected;
1981         }
1982
1983         /* then the longest running node */
1984         if (cmp == 0) {
1985                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1986         }
1987
1988         if (cmp == 0) {
1989                 cmp = (int)myem.pnn - (int)em->pnn;
1990         }
1991
1992         return cmp > 0;
1993 }
1994
1995 /*
1996   send out an election request
1997  */
1998 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1999 {
2000         int ret;
2001         TDB_DATA election_data;
2002         struct election_message emsg;
2003         uint64_t srvid;
2004         struct ctdb_context *ctdb = rec->ctdb;
2005
2006         srvid = CTDB_SRVID_RECOVERY;
2007
2008         ctdb_election_data(rec, &emsg);
2009
2010         election_data.dsize = sizeof(struct election_message);
2011         election_data.dptr  = (unsigned char *)&emsg;
2012
2013
2014         /* send an election message to all active nodes */
2015         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2016         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2017
2018
2019         /* A new node that is already frozen has entered the cluster.
2020            The existing nodes are not frozen and dont need to be frozen
2021            until the election has ended and we start the actual recovery
2022         */
2023         if (update_recmaster == true) {
2024                 /* first we assume we will win the election and set 
2025                    recoverymaster to be ourself on the current node
2026                  */
2027                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2028                 if (ret != 0) {
2029                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2030                         return -1;
2031                 }
2032         }
2033
2034
2035         return 0;
2036 }
2037
2038 /*
2039   this function will unban all nodes in the cluster
2040 */
2041 static void unban_all_nodes(struct ctdb_context *ctdb)
2042 {
2043         int ret, i;
2044         struct ctdb_node_map *nodemap;
2045         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2046         
2047         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2048         if (ret != 0) {
2049                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2050                 return;
2051         }
2052
2053         for (i=0;i<nodemap->num;i++) {
2054                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2055                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2056                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2057                 }
2058         }
2059
2060         talloc_free(tmp_ctx);
2061 }
2062
2063
2064 /*
2065   we think we are winning the election - send a broadcast election request
2066  */
2067 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2068 {
2069         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2070         int ret;
2071
2072         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2073         if (ret != 0) {
2074                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2075         }
2076
2077         talloc_free(rec->send_election_te);
2078         rec->send_election_te = NULL;
2079 }
2080
2081 /*
2082   handler for memory dumps
2083 */
2084 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2085                              TDB_DATA data, void *private_data)
2086 {
2087         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2088         TDB_DATA *dump;
2089         int ret;
2090         struct rd_memdump_reply *rd;
2091
2092         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2093                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2094                 talloc_free(tmp_ctx);
2095                 return;
2096         }
2097         rd = (struct rd_memdump_reply *)data.dptr;
2098
2099         dump = talloc_zero(tmp_ctx, TDB_DATA);
2100         if (dump == NULL) {
2101                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2102                 talloc_free(tmp_ctx);
2103                 return;
2104         }
2105         ret = ctdb_dump_memory(ctdb, dump);
2106         if (ret != 0) {
2107                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2108                 talloc_free(tmp_ctx);
2109                 return;
2110         }
2111
2112 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2113
2114         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2115         if (ret != 0) {
2116                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2117                 talloc_free(tmp_ctx);
2118                 return;
2119         }
2120
2121         talloc_free(tmp_ctx);
2122 }
2123
2124 /*
2125   handler for getlog
2126 */
2127 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2128                            TDB_DATA data, void *private_data)
2129 {
2130         struct ctdb_get_log_addr *log_addr;
2131         pid_t child;
2132
2133         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2134                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2135                 return;
2136         }
2137         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2138
2139         child = ctdb_fork_no_free_ringbuffer(ctdb);
2140         if (child == (pid_t)-1) {
2141                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2142                 return;
2143         }
2144
2145         if (child == 0) {
2146                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2147                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2148                         _exit(1);
2149                 }
2150                 ctdb_collect_log(ctdb, log_addr);
2151                 _exit(0);
2152         }
2153 }
2154
2155 /*
2156   handler for clearlog
2157 */
2158 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2159                              TDB_DATA data, void *private_data)
2160 {
2161         ctdb_clear_log(ctdb);
2162 }
2163
2164 /*
2165   handler for reload_nodes
2166 */
2167 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2168                              TDB_DATA data, void *private_data)
2169 {
2170         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2171
2172         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2173
2174         reload_nodes_file(rec->ctdb);
2175 }
2176
2177
2178 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2179                               struct timeval yt, void *p)
2180 {
2181         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2182
2183         talloc_free(rec->ip_check_disable_ctx);
2184         rec->ip_check_disable_ctx = NULL;
2185 }
2186
2187
2188 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2189                                   struct timeval t, void *p)
2190 {
2191         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2192         struct ctdb_context *ctdb = rec->ctdb;
2193         int ret;
2194
2195         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2196
2197         ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2198         if (ret != 0) {
2199                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2200                 rec->need_takeover_run = true;
2201         }
2202
2203         talloc_free(rec->deferred_rebalance_ctx);
2204         rec->deferred_rebalance_ctx = NULL;
2205 }
2206
2207         
2208 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2209                              TDB_DATA data, void *private_data)
2210 {
2211         uint32_t pnn;
2212         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2213
2214         if (data.dsize != sizeof(uint32_t)) {
2215                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2216                 return;
2217         }
2218
2219         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2220                 return;
2221         }
2222
2223         pnn = *(uint32_t *)&data.dptr[0];
2224
2225         lcp2_forcerebalance(ctdb, pnn);
2226         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2227
2228         if (rec->deferred_rebalance_ctx != NULL) {
2229                 talloc_free(rec->deferred_rebalance_ctx);
2230         }
2231         rec->deferred_rebalance_ctx = talloc_new(rec);
2232         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2233                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2234                         ctdb_rebalance_timeout, rec);
2235 }
2236
2237
2238
2239 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2240                              TDB_DATA data, void *private_data)
2241 {
2242         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2243         struct ctdb_public_ip *ip;
2244
2245         if (rec->recmaster != rec->ctdb->pnn) {
2246                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2247                 return;
2248         }
2249
2250         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2251                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2252                 return;
2253         }
2254
2255         ip = (struct ctdb_public_ip *)data.dptr;
2256
2257         update_ip_assignment_tree(rec->ctdb, ip);
2258 }
2259
2260
2261 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2262                              TDB_DATA data, void *private_data)
2263 {
2264         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2265         uint32_t timeout;
2266
2267         if (rec->ip_check_disable_ctx != NULL) {
2268                 talloc_free(rec->ip_check_disable_ctx);
2269                 rec->ip_check_disable_ctx = NULL;
2270         }
2271
2272         if (data.dsize != sizeof(uint32_t)) {
2273                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2274                                  "expexting %lu\n", (long unsigned)data.dsize,
2275                                  (long unsigned)sizeof(uint32_t)));
2276                 return;
2277         }
2278         if (data.dptr == NULL) {
2279                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2280                 return;
2281         }
2282
2283         timeout = *((uint32_t *)data.dptr);
2284
2285         if (timeout == 0) {
2286                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2287                 return;
2288         }
2289                 
2290         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2291
2292         rec->ip_check_disable_ctx = talloc_new(rec);
2293         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2294
2295         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2296 }
2297
2298
2299 /*
2300   handler for reload all ips.
2301 */
2302 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2303                              TDB_DATA data, void *private_data)
2304 {
2305         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2306
2307         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2308                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2309                 return;
2310         }
2311
2312         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2313
2314         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2315         return;
2316 }
2317
2318 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2319 {
2320         uint32_t *status = callback_data;
2321
2322         if (res != 0) {
2323                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2324                 *status = 1;
2325         }
2326 }
2327
2328 static int
2329 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2330 {
2331         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2332         uint32_t *nodes;
2333         uint32_t status;
2334         int i;
2335
2336         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2337         for (i = 0; i< nodemap->num; i++) {
2338                 if (nodemap->nodes[i].flags != 0) {
2339                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2340                         talloc_free(tmp_ctx);
2341                         return -1;
2342                 }
2343         }
2344
2345         /* send the flags update to all connected nodes */
2346         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2347         status = 0;
2348         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2349                                         nodes, 0,
2350                                         CONTROL_TIMEOUT(),
2351                                         false, tdb_null,
2352                                         async_reloadips_callback, NULL,
2353                                         &status) != 0) {
2354                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2355                 talloc_free(tmp_ctx);
2356                 return -1;
2357         }
2358
2359         if (status != 0) {
2360                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2361                 talloc_free(tmp_ctx);
2362                 return -1;
2363         }
2364
2365         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2366
2367         talloc_free(tmp_ctx);
2368         return 0;
2369 }
2370
2371
2372 /*
2373   handler for ip reallocate, just add it to the list of callers and 
2374   handle this later in the monitor_cluster loop so we do not recurse
2375   with other callers to takeover_run()
2376 */
2377 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2378                              TDB_DATA data, void *private_data)
2379 {
2380         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2381         struct ip_reallocate_list *caller;
2382
2383         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2384                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2385                 return;
2386         }
2387
2388         if (rec->ip_reallocate_ctx == NULL) {
2389                 rec->ip_reallocate_ctx = talloc_new(rec);
2390                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2391         }
2392
2393         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2394         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2395
2396         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2397         caller->next = rec->reallocate_callers;
2398         rec->reallocate_callers = caller;
2399
2400         return;
2401 }
2402
2403 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2404 {
2405         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2406         TDB_DATA result;
2407         int32_t ret;
2408         struct ip_reallocate_list *callers;
2409         uint32_t culprit;
2410
2411         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2412
2413         /* update the list of public ips that a node can handle for
2414            all connected nodes
2415         */
2416         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2417         if (ret != 0) {
2418                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2419                                  culprit));
2420                 rec->need_takeover_run = true;
2421         }
2422         if (ret == 0) {
2423                 ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2424                 if (ret != 0) {
2425                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2426                         rec->need_takeover_run = true;
2427                 }
2428         }
2429
2430         result.dsize = sizeof(int32_t);
2431         result.dptr  = (uint8_t *)&ret;
2432
2433         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2434
2435                 /* Someone that sent srvid==0 does not want a reply */
2436                 if (callers->rd->srvid == 0) {
2437                         continue;
2438                 }
2439                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2440                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2441                                   (unsigned long long)callers->rd->srvid));
2442                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2443                 if (ret != 0) {
2444                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2445                                          "message to %u:%llu\n",
2446                                          (unsigned)callers->rd->pnn,
2447                                          (unsigned long long)callers->rd->srvid));
2448                 }
2449         }
2450
2451         talloc_free(tmp_ctx);
2452         talloc_free(rec->ip_reallocate_ctx);
2453         rec->ip_reallocate_ctx = NULL;
2454         rec->reallocate_callers = NULL;
2455         
2456 }
2457
2458
2459 /*
2460   handler for recovery master elections
2461 */
2462 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2463                              TDB_DATA data, void *private_data)
2464 {
2465         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2466         int ret;
2467         struct election_message *em = (struct election_message *)data.dptr;
2468         TALLOC_CTX *mem_ctx;
2469
2470         /* we got an election packet - update the timeout for the election */
2471         talloc_free(rec->election_timeout);
2472         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2473                                                 fast_start ?
2474                                                 timeval_current_ofs(0, 500000) :
2475                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2476                                                 ctdb_election_timeout, rec);
2477
2478         mem_ctx = talloc_new(ctdb);
2479
2480         /* someone called an election. check their election data
2481            and if we disagree and we would rather be the elected node, 
2482            send a new election message to all other nodes
2483          */
2484         if (ctdb_election_win(rec, em)) {
2485                 if (!rec->send_election_te) {
2486                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2487                                                                 timeval_current_ofs(0, 500000),
2488                                                                 election_send_request, rec);
2489                 }
2490                 talloc_free(mem_ctx);
2491                 /*unban_all_nodes(ctdb);*/
2492                 return;
2493         }
2494         
2495         /* we didn't win */
2496         talloc_free(rec->send_election_te);
2497         rec->send_election_te = NULL;
2498
2499         if (ctdb->tunable.verify_recovery_lock != 0) {
2500                 /* release the recmaster lock */
2501                 if (em->pnn != ctdb->pnn &&
2502                     ctdb->recovery_lock_fd != -1) {
2503                         close(ctdb->recovery_lock_fd);
2504                         ctdb->recovery_lock_fd = -1;
2505                         unban_all_nodes(ctdb);
2506                 }
2507         }
2508
2509         /* ok, let that guy become recmaster then */
2510         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2511         if (ret != 0) {
2512                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2513                 talloc_free(mem_ctx);
2514                 return;
2515         }
2516
2517         talloc_free(mem_ctx);
2518         return;
2519 }
2520
2521
2522 /*
2523   force the start of the election process
2524  */
2525 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2526                            struct ctdb_node_map *nodemap)
2527 {
2528         int ret;
2529         struct ctdb_context *ctdb = rec->ctdb;
2530
2531         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2532
2533         /* set all nodes to recovery mode to stop all internode traffic */
2534         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2535         if (ret != 0) {
2536                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2537                 return;
2538         }
2539
2540         talloc_free(rec->election_timeout);
2541         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2542                                                 fast_start ?
2543                                                 timeval_current_ofs(0, 500000) :
2544                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2545                                                 ctdb_election_timeout, rec);
2546
2547         ret = send_election_request(rec, pnn, true);
2548         if (ret!=0) {
2549                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2550                 return;
2551         }
2552
2553         /* wait for a few seconds to collect all responses */
2554         ctdb_wait_election(rec);
2555 }
2556
2557
2558
2559 /*
2560   handler for when a node changes its flags
2561 */
2562 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2563                             TDB_DATA data, void *private_data)
2564 {
2565         int ret;
2566         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2567         struct ctdb_node_map *nodemap=NULL;
2568         TALLOC_CTX *tmp_ctx;
2569         int i;
2570         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2571         int disabled_flag_changed;
2572
2573         if (data.dsize != sizeof(*c)) {
2574                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2575                 return;
2576         }
2577
2578         tmp_ctx = talloc_new(ctdb);
2579         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2580
2581         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2582         if (ret != 0) {
2583                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2584                 talloc_free(tmp_ctx);
2585                 return;         
2586         }
2587
2588
2589         for (i=0;i<nodemap->num;i++) {
2590                 if (nodemap->nodes[i].pnn == c->pnn) break;
2591         }
2592
2593         if (i == nodemap->num) {
2594                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2595                 talloc_free(tmp_ctx);
2596                 return;
2597         }
2598
2599         if (nodemap->nodes[i].flags != c->new_flags) {
2600                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2601         }
2602
2603         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2604
2605         nodemap->nodes[i].flags = c->new_flags;
2606
2607         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2608                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2609
2610         if (ret == 0) {
2611                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2612                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2613         }
2614         
2615         if (ret == 0 &&
2616             ctdb->recovery_master == ctdb->pnn &&
2617             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2618                 /* Only do the takeover run if the perm disabled or unhealthy
2619                    flags changed since these will cause an ip failover but not
2620                    a recovery.
2621                    If the node became disconnected or banned this will also
2622                    lead to an ip address failover but that is handled 
2623                    during recovery
2624                 */
2625                 if (disabled_flag_changed) {
2626                         rec->need_takeover_run = true;
2627                 }
2628         }
2629
2630         talloc_free(tmp_ctx);
2631 }
2632
2633 /*
2634   handler for when we need to push out flag changes ot all other nodes
2635 */
2636 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2637                             TDB_DATA data, void *private_data)
2638 {
2639         int ret;
2640         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2641         struct ctdb_node_map *nodemap=NULL;
2642         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2643         uint32_t recmaster;
2644         uint32_t *nodes;
2645
2646         /* find the recovery master */
2647         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2648         if (ret != 0) {
2649                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2650                 talloc_free(tmp_ctx);
2651                 return;
2652         }
2653
2654         /* read the node flags from the recmaster */
2655         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2656         if (ret != 0) {
2657                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2658                 talloc_free(tmp_ctx);
2659                 return;
2660         }
2661         if (c->pnn >= nodemap->num) {
2662                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2663                 talloc_free(tmp_ctx);
2664                 return;
2665         }
2666
2667         /* send the flags update to all connected nodes */
2668         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2669
2670         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2671                                       nodes, 0, CONTROL_TIMEOUT(),
2672                                       false, data,
2673                                       NULL, NULL,
2674                                       NULL) != 0) {
2675                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2676
2677                 talloc_free(tmp_ctx);
2678                 return;
2679         }
2680
2681         talloc_free(tmp_ctx);
2682 }
2683
2684
2685 struct verify_recmode_normal_data {
2686         uint32_t count;
2687         enum monitor_result status;
2688 };
2689
2690 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2691 {
2692         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2693
2694
2695         /* one more node has responded with recmode data*/
2696         rmdata->count--;
2697
2698         /* if we failed to get the recmode, then return an error and let
2699            the main loop try again.
2700         */
2701         if (state->state != CTDB_CONTROL_DONE) {
2702                 if (rmdata->status == MONITOR_OK) {
2703                         rmdata->status = MONITOR_FAILED;
2704                 }
2705                 return;
2706         }
2707
2708         /* if we got a response, then the recmode will be stored in the
2709            status field
2710         */
2711         if (state->status != CTDB_RECOVERY_NORMAL) {
2712                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2713                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2714         }
2715
2716         return;
2717 }
2718
2719
2720 /* verify that all nodes are in normal recovery mode */
2721 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2722 {
2723         struct verify_recmode_normal_data *rmdata;
2724         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2725         struct ctdb_client_control_state *state;
2726         enum monitor_result status;
2727         int j;
2728         
2729         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2730         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2731         rmdata->count  = 0;
2732         rmdata->status = MONITOR_OK;
2733
2734         /* loop over all active nodes and send an async getrecmode call to 
2735            them*/
2736         for (j=0; j<nodemap->num; j++) {
2737                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2738                         continue;
2739                 }
2740                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2741                                         CONTROL_TIMEOUT(), 
2742                                         nodemap->nodes[j].pnn);
2743                 if (state == NULL) {
2744                         /* we failed to send the control, treat this as 
2745                            an error and try again next iteration
2746                         */                      
2747                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2748                         talloc_free(mem_ctx);
2749                         return MONITOR_FAILED;
2750                 }
2751
2752                 /* set up the callback functions */
2753                 state->async.fn = verify_recmode_normal_callback;
2754                 state->async.private_data = rmdata;
2755
2756                 /* one more control to wait for to complete */
2757                 rmdata->count++;
2758         }
2759
2760
2761         /* now wait for up to the maximum number of seconds allowed
2762            or until all nodes we expect a response from has replied
2763         */
2764         while (rmdata->count > 0) {
2765                 event_loop_once(ctdb->ev);
2766         }
2767
2768         status = rmdata->status;
2769         talloc_free(mem_ctx);
2770         return status;
2771 }
2772
2773
2774 struct verify_recmaster_data {
2775         struct ctdb_recoverd *rec;
2776         uint32_t count;
2777         uint32_t pnn;
2778         enum monitor_result status;
2779 };
2780
2781 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2782 {
2783         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2784
2785
2786         /* one more node has responded with recmaster data*/
2787         rmdata->count--;
2788
2789         /* if we failed to get the recmaster, then return an error and let
2790            the main loop try again.
2791         */
2792         if (state->state != CTDB_CONTROL_DONE) {
2793                 if (rmdata->status == MONITOR_OK) {
2794                         rmdata->status = MONITOR_FAILED;
2795                 }
2796                 return;
2797         }
2798
2799         /* if we got a response, then the recmaster will be stored in the
2800            status field
2801         */
2802         if (state->status != rmdata->pnn) {
2803                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2804                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2805                 rmdata->status = MONITOR_ELECTION_NEEDED;
2806         }
2807
2808         return;
2809 }
2810
2811
2812 /* verify that all nodes agree that we are the recmaster */
2813 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2814 {
2815         struct ctdb_context *ctdb = rec->ctdb;
2816         struct verify_recmaster_data *rmdata;
2817         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2818         struct ctdb_client_control_state *state;
2819         enum monitor_result status;
2820         int j;
2821         
2822         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2823         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2824         rmdata->rec    = rec;
2825         rmdata->count  = 0;
2826         rmdata->pnn    = pnn;
2827         rmdata->status = MONITOR_OK;
2828
2829         /* loop over all active nodes and send an async getrecmaster call to 
2830            them*/
2831         for (j=0; j<nodemap->num; j++) {
2832                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2833                         continue;
2834                 }
2835                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2836                                         CONTROL_TIMEOUT(),
2837                                         nodemap->nodes[j].pnn);
2838                 if (state == NULL) {
2839                         /* we failed to send the control, treat this as 
2840                            an error and try again next iteration
2841                         */                      
2842                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2843                         talloc_free(mem_ctx);
2844                         return MONITOR_FAILED;
2845                 }
2846
2847                 /* set up the callback functions */
2848                 state->async.fn = verify_recmaster_callback;
2849                 state->async.private_data = rmdata;
2850
2851                 /* one more control to wait for to complete */
2852                 rmdata->count++;
2853         }
2854
2855
2856         /* now wait for up to the maximum number of seconds allowed
2857            or until all nodes we expect a response from has replied
2858         */
2859         while (rmdata->count > 0) {
2860                 event_loop_once(ctdb->ev);
2861         }
2862
2863         status = rmdata->status;
2864         talloc_free(mem_ctx);
2865         return status;
2866 }
2867
2868 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2869                                     struct ctdb_recoverd *rec)
2870 {
2871         struct ctdb_control_get_ifaces *ifaces = NULL;
2872         TALLOC_CTX *mem_ctx;
2873         bool ret = false;
2874
2875         mem_ctx = talloc_new(NULL);
2876
2877         /* Read the interfaces from the local node */
2878         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2879                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2880                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2881                 /* We could return an error.  However, this will be
2882                  * rare so we'll decide that the interfaces have
2883                  * actually changed, just in case.
2884                  */
2885                 talloc_free(mem_ctx);
2886                 return true;
2887         }
2888
2889         if (!rec->ifaces) {
2890                 /* We haven't been here before so things have changed */
2891                 ret = true;
2892         } else if (rec->ifaces->num != ifaces->num) {
2893                 /* Number of interfaces has changed */
2894                 ret = true;
2895         } else {
2896                 /* See if interface names or link states have changed */
2897                 int i;
2898                 for (i = 0; i < rec->ifaces->num; i++) {
2899                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2900                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0 ||
2901                             iface->link_state != ifaces->ifaces[i].link_state) {
2902                                 ret = true;
2903                                 break;
2904                         }
2905                 }
2906         }
2907
2908         talloc_free(rec->ifaces);
2909         rec->ifaces = talloc_steal(rec, ifaces);
2910
2911         talloc_free(mem_ctx);
2912         return ret;
2913 }
2914
2915 /* called to check that the local allocation of public ip addresses is ok.
2916 */
2917 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2918 {
2919         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2920         struct ctdb_uptime *uptime1 = NULL;
2921         struct ctdb_uptime *uptime2 = NULL;
2922         int ret, j;
2923         bool need_takeover_run = false;
2924
2925         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2926                                 CTDB_CURRENT_NODE, &uptime1);
2927         if (ret != 0) {
2928                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2929                 talloc_free(mem_ctx);
2930                 return -1;
2931         }
2932
2933         if (interfaces_have_changed(ctdb, rec)) {
2934                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2935                                      "local node %u - force takeover run\n",
2936                                      pnn));
2937                 need_takeover_run = true;
2938         }
2939
2940         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2941                                 CTDB_CURRENT_NODE, &uptime2);
2942         if (ret != 0) {
2943                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2944                 talloc_free(mem_ctx);
2945                 return -1;
2946         }
2947
2948         /* skip the check if the startrecovery time has changed */
2949         if (timeval_compare(&uptime1->last_recovery_started,
2950                             &uptime2->last_recovery_started) != 0) {
2951                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2952                 talloc_free(mem_ctx);
2953                 return 0;
2954         }
2955
2956         /* skip the check if the endrecovery time has changed */
2957         if (timeval_compare(&uptime1->last_recovery_finished,
2958                             &uptime2->last_recovery_finished) != 0) {
2959                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2960                 talloc_free(mem_ctx);
2961                 return 0;
2962         }
2963
2964         /* skip the check if we have started but not finished recovery */
2965         if (timeval_compare(&uptime1->last_recovery_finished,
2966                             &uptime1->last_recovery_started) != 1) {
2967                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2968                 talloc_free(mem_ctx);
2969
2970                 return 0;
2971         }
2972
2973         /* verify that we have the ip addresses we should have
2974            and we dont have ones we shouldnt have.
2975            if we find an inconsistency we set recmode to
2976            active on the local node and wait for the recmaster
2977            to do a full blown recovery.
2978            also if the pnn is -1 and we are healthy and can host the ip
2979            we also request a ip reallocation.
2980         */
2981         if (ctdb->tunable.disable_ip_failover == 0) {
2982                 struct ctdb_all_public_ips *ips = NULL;
2983
2984                 /* read the *available* IPs from the local node */
2985                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2986                 if (ret != 0) {
2987                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
2988                         talloc_free(mem_ctx);
2989                         return -1;
2990                 }
2991
2992                 for (j=0; j<ips->num; j++) {
2993                         if (ips->ips[j].pnn == -1 &&
2994                             nodemap->nodes[pnn].flags == 0) {
2995                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
2996                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
2997                                 need_takeover_run = true;
2998                         }
2999                 }
3000
3001                 talloc_free(ips);
3002
3003                 /* read the *known* IPs from the local node */
3004                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3005                 if (ret != 0) {
3006                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3007                         talloc_free(mem_ctx);
3008                         return -1;
3009                 }
3010
3011                 for (j=0; j<ips->num; j++) {
3012                         if (ips->ips[j].pnn == pnn) {
3013                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3014                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3015                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3016                                         need_takeover_run = true;
3017                                 }
3018                         } else {
3019                                 if (ctdb->do_checkpublicip &&
3020                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3021
3022                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3023                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3024
3025                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3026                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3027                                         }
3028                                 }
3029                         }
3030                 }
3031         }
3032
3033         if (need_takeover_run) {
3034                 struct takeover_run_reply rd;
3035                 TDB_DATA data;
3036
3037                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3038
3039                 rd.pnn = ctdb->pnn;
3040                 rd.srvid = 0;
3041                 data.dptr = (uint8_t *)&rd;
3042                 data.dsize = sizeof(rd);
3043
3044                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3045                 if (ret != 0) {
3046                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3047                 }
3048         }
3049         talloc_free(mem_ctx);
3050         return 0;
3051 }
3052
3053
3054 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3055 {
3056         struct ctdb_node_map **remote_nodemaps = callback_data;
3057
3058         if (node_pnn >= ctdb->num_nodes) {
3059                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3060                 return;
3061         }
3062
3063         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3064
3065 }
3066
3067 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3068         struct ctdb_node_map *nodemap,
3069         struct ctdb_node_map **remote_nodemaps)
3070 {
3071         uint32_t *nodes;
3072
3073         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3074         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3075                                         nodes, 0,
3076                                         CONTROL_TIMEOUT(), false, tdb_null,
3077                                         async_getnodemap_callback,
3078                                         NULL,
3079                                         remote_nodemaps) != 0) {
3080                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3081
3082                 return -1;
3083         }
3084
3085         return 0;
3086 }
3087
3088 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3089 struct ctdb_check_reclock_state {
3090         struct ctdb_context *ctdb;
3091         struct timeval start_time;
3092         int fd[2];
3093         pid_t child;
3094         struct timed_event *te;
3095         struct fd_event *fde;
3096         enum reclock_child_status status;
3097 };
3098
3099 /* when we free the reclock state we must kill any child process.
3100 */
3101 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3102 {
3103         struct ctdb_context *ctdb = state->ctdb;
3104
3105         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3106
3107         if (state->fd[0] != -1) {
3108                 close(state->fd[0]);
3109                 state->fd[0] = -1;
3110         }
3111         if (state->fd[1] != -1) {
3112                 close(state->fd[1]);
3113                 state->fd[1] = -1;
3114         }
3115         ctdb_kill(ctdb, state->child, SIGKILL);
3116         return 0;
3117 }
3118
3119 /*
3120   called if our check_reclock child times out. this would happen if
3121   i/o to the reclock file blocks.
3122  */
3123 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3124                                          struct timeval t, void *private_data)
3125 {
3126         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3127                                            struct ctdb_check_reclock_state);
3128
3129         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3130         state->status = RECLOCK_TIMEOUT;
3131 }
3132
3133 /* this is called when the child process has completed checking the reclock
3134    file and has written data back to us through the pipe.
3135 */
3136 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3137                              uint16_t flags, void *private_data)
3138 {
3139         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3140                                              struct ctdb_check_reclock_state);
3141         char c = 0;
3142         int ret;
3143
3144         /* we got a response from our child process so we can abort the
3145            timeout.
3146         */
3147         talloc_free(state->te);
3148         state->te = NULL;
3149
3150         ret = read(state->fd[0], &c, 1);
3151         if (ret != 1 || c != RECLOCK_OK) {
3152                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3153                 state->status = RECLOCK_FAILED;
3154
3155                 return;
3156         }
3157
3158         state->status = RECLOCK_OK;
3159         return;
3160 }
3161
3162 static int check_recovery_lock(struct ctdb_context *ctdb)
3163 {
3164         int ret;
3165         struct ctdb_check_reclock_state *state;
3166         pid_t parent = getpid();
3167
3168         if (ctdb->recovery_lock_fd == -1) {
3169                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3170                 return -1;
3171         }
3172
3173         state = talloc(ctdb, struct ctdb_check_reclock_state);
3174         CTDB_NO_MEMORY(ctdb, state);
3175
3176         state->ctdb = ctdb;
3177         state->start_time = timeval_current();
3178         state->status = RECLOCK_CHECKING;
3179         state->fd[0] = -1;
3180         state->fd[1] = -1;
3181
3182         ret = pipe(state->fd);
3183         if (ret != 0) {
3184                 talloc_free(state);
3185                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3186                 return -1;
3187         }
3188
3189         state->child = ctdb_fork(ctdb);
3190         if (state->child == (pid_t)-1) {
3191                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3192                 close(state->fd[0]);
3193                 state->fd[0] = -1;
3194                 close(state->fd[1]);
3195                 state->fd[1] = -1;
3196                 talloc_free(state);
3197                 return -1;
3198         }
3199
3200         if (state->child == 0) {
3201                 char cc = RECLOCK_OK;
3202                 close(state->fd[0]);
3203                 state->fd[0] = -1;
3204
3205                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3206                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3207                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3208                         cc = RECLOCK_FAILED;
3209                 }
3210
3211                 write(state->fd[1], &cc, 1);
3212                 /* make sure we die when our parent dies */
3213                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3214                         sleep(5);
3215                         write(state->fd[1], &cc, 1);
3216                 }
3217                 _exit(0);
3218         }
3219         close(state->fd[1]);
3220         state->fd[1] = -1;
3221         set_close_on_exec(state->fd[0]);
3222
3223         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3224
3225         talloc_set_destructor(state, check_reclock_destructor);
3226
3227         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3228                                     ctdb_check_reclock_timeout, state);
3229         if (state->te == NULL) {
3230                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3231                 talloc_free(state);
3232                 return -1;
3233         }
3234
3235         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3236                                 EVENT_FD_READ,
3237                                 reclock_child_handler,
3238                                 (void *)state);
3239
3240         if (state->fde == NULL) {
3241                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3242                 talloc_free(state);
3243                 return -1;
3244         }
3245         tevent_fd_set_auto_close(state->fde);
3246
3247         while (state->status == RECLOCK_CHECKING) {
3248                 event_loop_once(ctdb->ev);
3249         }
3250
3251         if (state->status == RECLOCK_FAILED) {
3252                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3253                 close(ctdb->recovery_lock_fd);
3254                 ctdb->recovery_lock_fd = -1;
3255                 talloc_free(state);
3256                 return -1;
3257         }
3258
3259         talloc_free(state);
3260         return 0;
3261 }
3262
3263 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3264 {
3265         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3266         const char *reclockfile;
3267
3268         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3269                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3270                 talloc_free(tmp_ctx);
3271                 return -1;      
3272         }
3273
3274         if (reclockfile == NULL) {
3275                 if (ctdb->recovery_lock_file != NULL) {
3276                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3277                         talloc_free(ctdb->recovery_lock_file);
3278                         ctdb->recovery_lock_file = NULL;
3279                         if (ctdb->recovery_lock_fd != -1) {
3280                                 close(ctdb->recovery_lock_fd);
3281                                 ctdb->recovery_lock_fd = -1;
3282                         }
3283                 }
3284                 ctdb->tunable.verify_recovery_lock = 0;
3285                 talloc_free(tmp_ctx);
3286                 return 0;
3287         }
3288
3289         if (ctdb->recovery_lock_file == NULL) {
3290                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3291                 if (ctdb->recovery_lock_fd != -1) {
3292                         close(ctdb->recovery_lock_fd);
3293                         ctdb->recovery_lock_fd = -1;
3294                 }
3295                 talloc_free(tmp_ctx);
3296                 return 0;
3297         }
3298
3299
3300         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3301                 talloc_free(tmp_ctx);
3302                 return 0;
3303         }
3304
3305         talloc_free(ctdb->recovery_lock_file);
3306         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3307         ctdb->tunable.verify_recovery_lock = 0;
3308         if (ctdb->recovery_lock_fd != -1) {
3309                 close(ctdb->recovery_lock_fd);
3310                 ctdb->recovery_lock_fd = -1;
3311         }
3312
3313         talloc_free(tmp_ctx);
3314         return 0;
3315 }
3316
3317 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3318                       TALLOC_CTX *mem_ctx)
3319 {
3320         uint32_t pnn;
3321         struct ctdb_node_map *nodemap=NULL;
3322         struct ctdb_node_map *recmaster_nodemap=NULL;
3323         struct ctdb_node_map **remote_nodemaps=NULL;
3324         struct ctdb_vnn_map *vnnmap=NULL;
3325         struct ctdb_vnn_map *remote_vnnmap=NULL;
3326         int32_t debug_level;
3327         int i, j, ret;
3328
3329
3330
3331         /* verify that the main daemon is still running */
3332         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3333                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3334                 exit(-1);
3335         }
3336
3337         /* ping the local daemon to tell it we are alive */
3338         ctdb_ctrl_recd_ping(ctdb);
3339
3340         if (rec->election_timeout) {
3341                 /* an election is in progress */
3342                 return;
3343         }
3344
3345         /* read the debug level from the parent and update locally */
3346         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3347         if (ret !=0) {
3348                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3349                 return;
3350         }
3351         LogLevel = debug_level;
3352
3353
3354         /* We must check if we need to ban a node here but we want to do this
3355            as early as possible so we dont wait until we have pulled the node
3356            map from the local node. thats why we have the hardcoded value 20
3357         */
3358         for (i=0; i<ctdb->num_nodes; i++) {
3359                 struct ctdb_banning_state *ban_state;
3360
3361                 if (ctdb->nodes[i]->ban_state == NULL) {
3362                         continue;
3363                 }
3364                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3365                 if (ban_state->count < 20) {
3366                         continue;
3367                 }
3368                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3369                         ctdb->nodes[i]->pnn, ban_state->count,
3370                         ctdb->tunable.recovery_ban_period));
3371                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3372                 ban_state->count = 0;
3373         }
3374
3375         /* get relevant tunables */
3376         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3377         if (ret != 0) {
3378                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3379                 return;
3380         }
3381
3382         /* get the current recovery lock file from the server */
3383         if (update_recovery_lock_file(ctdb) != 0) {
3384                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3385                 return;
3386         }
3387
3388         /* Make sure that if recovery lock verification becomes disabled when
3389            we close the file
3390         */
3391         if (ctdb->tunable.verify_recovery_lock == 0) {
3392                 if (ctdb->recovery_lock_fd != -1) {
3393                         close(ctdb->recovery_lock_fd);
3394                         ctdb->recovery_lock_fd = -1;
3395                 }
3396         }
3397
3398         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3399         if (pnn == (uint32_t)-1) {
3400                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3401                 return;
3402         }
3403
3404         /* get the vnnmap */
3405         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3406         if (ret != 0) {
3407                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3408                 return;
3409         }
3410
3411
3412         /* get number of nodes */
3413         if (rec->nodemap) {
3414                 talloc_free(rec->nodemap);
3415                 rec->nodemap = NULL;
3416                 nodemap=NULL;
3417         }
3418         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3419         if (ret != 0) {
3420                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3421                 return;
3422         }
3423         nodemap = rec->nodemap;
3424
3425         /* remember our own node flags */
3426         rec->node_flags = nodemap->nodes[pnn].flags;
3427
3428         /* update the capabilities for all nodes */
3429         ret = update_capabilities(ctdb, nodemap);
3430         if (ret != 0) {
3431                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3432                 return;
3433         }
3434
3435         /* check which node is the recovery master */
3436         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3437         if (ret != 0) {
3438                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3439                 return;
3440         }
3441
3442         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3443         if (rec->recmaster != pnn) {
3444                 if (rec->ip_reallocate_ctx != NULL) {
3445                         talloc_free(rec->ip_reallocate_ctx);
3446                         rec->ip_reallocate_ctx = NULL;
3447                         rec->reallocate_callers = NULL;
3448                 }
3449         }
3450
3451         if (rec->recmaster == (uint32_t)-1) {
3452                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3453                 force_election(rec, pnn, nodemap);
3454                 return;
3455         }
3456
3457         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3458            also frozen and thet the recmode is set to active.
3459         */
3460         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3461                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3462                 if (ret != 0) {
3463                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3464                 }
3465                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3466                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3467
3468                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3469                         if (ret != 0) {
3470                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3471                                 return;
3472                         }
3473                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3474                         if (ret != 0) {
3475                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3476
3477                                 return;
3478                         }
3479                 }
3480
3481                 /* If this node is stopped or banned then it is not the recovery
3482                  * master, so don't do anything. This prevents stopped or banned
3483                  * node from starting election and sending unnecessary controls.
3484                  */
3485                 return;
3486         }
3487
3488         /* If the local node is stopped, verify we are not the recmaster 
3489            and yield this role if so
3490         */
3491         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
3492                 DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
3493                 force_election(rec, pnn, nodemap);
3494                 return;
3495         }
3496         
3497         /*
3498          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3499          * but we have force an election and try to become the new
3500          * recmaster
3501          */
3502         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3503             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3504              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3505                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3506                                   " but we (node %u) have - force an election\n",
3507                                   rec->recmaster, pnn));
3508                 force_election(rec, pnn, nodemap);
3509                 return;
3510         }
3511
3512         /* count how many active nodes there are */
3513         rec->num_active    = 0;
3514         rec->num_connected = 0;
3515         for (i=0; i<nodemap->num; i++) {
3516                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3517                         rec->num_active++;
3518                 }
3519                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3520                         rec->num_connected++;
3521                 }
3522         }
3523
3524
3525         /* verify that the recmaster node is still active */
3526         for (j=0; j<nodemap->num; j++) {
3527                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3528                         break;
3529                 }
3530         }
3531
3532         if (j == nodemap->num) {
3533                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3534                 force_election(rec, pnn, nodemap);
3535                 return;
3536         }
3537
3538         /* if recovery master is disconnected we must elect a new recmaster */
3539         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3540                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3541                 force_election(rec, pnn, nodemap);
3542                 return;
3543         }
3544
3545         /* get nodemap from the recovery master to check if it is inactive */
3546         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3547                                    mem_ctx, &recmaster_nodemap);
3548         if (ret != 0) {
3549                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3550                           nodemap->nodes[j].pnn));
3551                 return;
3552         }
3553
3554
3555         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3556             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3557                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3558                 force_election(rec, pnn, nodemap);
3559                 return;
3560         }
3561
3562         /* verify that we have all ip addresses we should have and we dont
3563          * have addresses we shouldnt have.
3564          */ 
3565         if (ctdb->tunable.disable_ip_failover == 0) {
3566                 if (rec->ip_check_disable_ctx == NULL) {
3567                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3568                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3569                         }
3570                 }
3571         }
3572
3573
3574         /* if we are not the recmaster then we do not need to check
3575            if recovery is needed
3576          */
3577         if (pnn != rec->recmaster) {
3578                 return;
3579         }
3580
3581
3582         /* ensure our local copies of flags are right */
3583         ret = update_local_flags(rec, nodemap);
3584         if (ret == MONITOR_ELECTION_NEEDED) {
3585                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3586                 force_election(rec, pnn, nodemap);
3587                 return;
3588         }
3589         if (ret != MONITOR_OK) {
3590                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3591                 return;
3592         }
3593
3594         if (ctdb->num_nodes != nodemap->num) {
3595                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3596                 reload_nodes_file(ctdb);
3597                 return;
3598         }
3599
3600         /* verify that all active nodes agree that we are the recmaster */
3601         switch (verify_recmaster(rec, nodemap, pnn)) {
3602         case MONITOR_RECOVERY_NEEDED:
3603                 /* can not happen */
3604                 return;
3605         case MONITOR_ELECTION_NEEDED:
3606                 force_election(rec, pnn, nodemap);
3607                 return;
3608         case MONITOR_OK:
3609                 break;
3610         case MONITOR_FAILED:
3611                 return;
3612         }
3613
3614
3615         if (rec->need_recovery) {
3616                 /* a previous recovery didn't finish */
3617                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3618                 return;
3619         }
3620
3621         /* verify that all active nodes are in normal mode 
3622            and not in recovery mode 
3623         */
3624         switch (verify_recmode(ctdb, nodemap)) {
3625         case MONITOR_RECOVERY_NEEDED:
3626                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3627                 return;
3628         case MONITOR_FAILED:
3629                 return;
3630         case MONITOR_ELECTION_NEEDED:
3631                 /* can not happen */
3632         case MONITOR_OK:
3633                 break;
3634         }
3635
3636
3637         if (ctdb->tunable.verify_recovery_lock != 0) {
3638                 /* we should have the reclock - check its not stale */
3639                 ret = check_recovery_lock(ctdb);
3640                 if (ret != 0) {
3641                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3642                         ctdb_set_culprit(rec, ctdb->pnn);
3643                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3644                         return;
3645                 }
3646         }
3647
3648
3649         /* is there a pending reload all ips ? */
3650         if (reload_all_ips_request != NULL) {
3651                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3652                 talloc_free(reload_all_ips_request);
3653                 reload_all_ips_request = NULL;
3654         }
3655
3656         /* if there are takeovers requested, perform it and notify the waiters */
3657         if (rec->reallocate_callers) {
3658                 process_ipreallocate_requests(ctdb, rec);
3659         }
3660
3661         /* get the nodemap for all active remote nodes
3662          */
3663         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3664         if (remote_nodemaps == NULL) {
3665                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3666                 return;
3667         }
3668         for(i=0; i<nodemap->num; i++) {
3669                 remote_nodemaps[i] = NULL;
3670         }
3671         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3672                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3673                 return;
3674         } 
3675
3676         /* verify that all other nodes have the same nodemap as we have
3677         */
3678         for (j=0; j<nodemap->num; j++) {
3679                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3680                         continue;
3681                 }
3682
3683                 if (remote_nodemaps[j] == NULL) {
3684                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3685                         ctdb_set_culprit(rec, j);
3686
3687                         return;
3688                 }
3689
3690                 /* if the nodes disagree on how many nodes there are
3691                    then this is a good reason to try recovery
3692                  */
3693                 if (remote_nodemaps[j]->num != nodemap->num) {
3694                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3695                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3696                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3697                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3698                         return;
3699                 }
3700
3701                 /* if the nodes disagree on which nodes exist and are
3702                    active, then that is also a good reason to do recovery
3703                  */
3704                 for (i=0;i<nodemap->num;i++) {
3705                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3706                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3707                                           nodemap->nodes[j].pnn, i, 
3708                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3709                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3710                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3711                                             vnnmap);
3712                                 return;
3713                         }
3714                 }
3715
3716                 /* verify the flags are consistent
3717                 */
3718                 for (i=0; i<nodemap->num; i++) {
3719                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3720                                 continue;
3721                         }
3722                         
3723                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3724                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3725                                   nodemap->nodes[j].pnn, 
3726                                   nodemap->nodes[i].pnn, 
3727                                   remote_nodemaps[j]->nodes[i].flags,
3728                                   nodemap->nodes[i].flags));
3729                                 if (i == j) {
3730                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3731                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3732                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3733                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3734                                                     vnnmap);
3735                                         return;
3736                                 } else {
3737                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3738                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3739                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3740                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3741                                                     vnnmap);
3742                                         return;
3743                                 }
3744                         }
3745                 }
3746         }
3747
3748
3749         /* there better be the same number of lmasters in the vnn map
3750            as there are active nodes or we will have to do a recovery
3751          */
3752         if (vnnmap->size != rec->num_active) {
3753                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3754                           vnnmap->size, rec->num_active));
3755                 ctdb_set_culprit(rec, ctdb->pnn);
3756                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3757                 return;
3758         }
3759
3760         /* verify that all active nodes in the nodemap also exist in 
3761            the vnnmap.
3762          */
3763         for (j=0; j<nodemap->num; j++) {
3764                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3765                         continue;
3766                 }
3767                 if (nodemap->nodes[j].pnn == pnn) {
3768                         continue;
3769                 }
3770
3771                 for (i=0; i<vnnmap->size; i++) {
3772                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3773                                 break;
3774                         }
3775                 }
3776                 if (i == vnnmap->size) {
3777                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3778                                   nodemap->nodes[j].pnn));
3779                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3780                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3781                         return;
3782                 }
3783         }
3784
3785         
3786         /* verify that all other nodes have the same vnnmap
3787            and are from the same generation
3788          */
3789         for (j=0; j<nodemap->num; j++) {
3790                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3791                         continue;
3792                 }
3793                 if (nodemap->nodes[j].pnn == pnn) {
3794                         continue;
3795                 }
3796
3797                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3798                                           mem_ctx, &remote_vnnmap);
3799                 if (ret != 0) {
3800                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3801                                   nodemap->nodes[j].pnn));
3802                         return;
3803                 }
3804
3805                 /* verify the vnnmap generation is the same */
3806                 if (vnnmap->generation != remote_vnnmap->generation) {
3807                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3808                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3809                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3810                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3811                         return;
3812                 }
3813
3814                 /* verify the vnnmap size is the same */
3815                 if (vnnmap->size != remote_vnnmap->size) {
3816                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3817                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3818                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3819                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3820                         return;
3821                 }
3822
3823                 /* verify the vnnmap is the same */
3824                 for (i=0;i<vnnmap->size;i++) {
3825                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3826                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3827                                           nodemap->nodes[j].pnn));
3828                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3829                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3830                                             vnnmap);
3831                                 return;
3832                         }
3833                 }
3834         }
3835
3836         /* we might need to change who has what IP assigned */
3837         if (rec->need_takeover_run) {
3838                 uint32_t culprit = (uint32_t)-1;
3839
3840                 rec->need_takeover_run = false;
3841
3842                 /* update the list of public ips that a node can handle for
3843                    all connected nodes
3844                 */
3845                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3846                 if (ret != 0) {
3847                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3848                                          culprit));
3849                         rec->need_takeover_run = true;
3850                         return;
3851                 }
3852
3853                 /* execute the "startrecovery" event script on all nodes */
3854                 ret = run_startrecovery_eventscript(rec, nodemap);
3855                 if (ret!=0) {
3856                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3857                         ctdb_set_culprit(rec, ctdb->pnn);
3858                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3859                         return;
3860                 }
3861
3862                 /* If takeover run fails, then the offending nodes are
3863                  * assigned ban culprit counts. And we re-try takeover.
3864                  * If takeover run fails repeatedly, the node would get
3865                  * banned.
3866                  *
3867                  * If rec->need_takeover_run is not set to true at this
3868                  * failure, monitoring is disabled cluster-wide (via
3869                  * startrecovery eventscript) and will not get enabled.
3870                  */
3871                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3872                 if (ret != 0) {
3873                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3874                         return;
3875                 }
3876
3877                 /* execute the "recovered" event script on all nodes */
3878                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3879 #if 0
3880 // we cant check whether the event completed successfully
3881 // since this script WILL fail if the node is in recovery mode
3882 // and if that race happens, the code here would just cause a second
3883 // cascading recovery.
3884                 if (ret!=0) {
3885                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3886                         ctdb_set_culprit(rec, ctdb->pnn);
3887                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3888                 }
3889 #endif
3890         }
3891 }
3892
3893 /*
3894   the main monitoring loop
3895  */
3896 static void monitor_cluster(struct ctdb_context *ctdb)
3897 {
3898         struct ctdb_recoverd *rec;
3899
3900         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3901
3902         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3903         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3904
3905         rec->ctdb = ctdb;
3906
3907         rec->priority_time = timeval_current();
3908
3909         /* register a message port for sending memory dumps */
3910         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3911
3912         /* register a message port for requesting logs */
3913         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3914
3915         /* register a message port for clearing logs */
3916         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3917
3918         /* register a message port for recovery elections */
3919         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3920
3921         /* when nodes are disabled/enabled */
3922         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3923
3924         /* when we are asked to puch out a flag change */
3925         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3926
3927         /* register a message port for vacuum fetch */
3928         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3929
3930         /* register a message port for reloadnodes  */
3931         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3932
3933         /* register a message port for performing a takeover run */
3934         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3935
3936         /* register a message port for performing a reload all ips */
3937         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3938
3939         /* register a message port for disabling the ip check for a short while */
3940         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3941
3942         /* register a message port for updating the recovery daemons node assignment for an ip */
3943         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3944
3945         /* register a message port for forcing a rebalance of a node next
3946            reallocation */
3947         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3948
3949         for (;;) {
3950                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3951                 struct timeval start;
3952                 double elapsed;
3953
3954                 if (!mem_ctx) {
3955                         DEBUG(DEBUG_CRIT,(__location__
3956                                           " Failed to create temp context\n"));
3957                         exit(-1);
3958                 }
3959
3960                 start = timeval_current();
3961                 main_loop(ctdb, rec, mem_ctx);
3962                 talloc_free(mem_ctx);
3963
3964                 /* we only check for recovery once every second */
3965                 elapsed = timeval_elapsed(&start);
3966                 if (elapsed < ctdb->tunable.recover_interval) {
3967                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3968                                           - elapsed);
3969                 }
3970         }
3971 }
3972
3973 /*
3974   event handler for when the main ctdbd dies
3975  */
3976 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3977                                  uint16_t flags, void *private_data)
3978 {
3979         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3980         _exit(1);
3981 }
3982
3983 /*
3984   called regularly to verify that the recovery daemon is still running
3985  */
3986 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3987                               struct timeval yt, void *p)
3988 {
3989         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3990
3991         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3992                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3993
3994                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3995                                 ctdb_restart_recd, ctdb);
3996
3997                 return;
3998         }
3999
4000         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4001                         timeval_current_ofs(30, 0),
4002                         ctdb_check_recd, ctdb);
4003 }
4004
4005 static void recd_sig_child_handler(struct event_context *ev,
4006         struct signal_event *se, int signum, int count,
4007         void *dont_care, 
4008         void *private_data)
4009 {
4010 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4011         int status;
4012         pid_t pid = -1;
4013
4014         while (pid != 0) {
4015                 pid = waitpid(-1, &status, WNOHANG);
4016                 if (pid == -1) {
4017                         if (errno != ECHILD) {
4018                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4019                         }
4020                         return;
4021                 }
4022                 if (pid > 0) {
4023                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4024                 }
4025         }
4026 }
4027
4028 /*
4029   startup the recovery daemon as a child of the main ctdb daemon
4030  */
4031 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4032 {
4033         int fd[2];
4034         struct signal_event *se;
4035         struct tevent_fd *fde;
4036
4037         if (pipe(fd) != 0) {
4038                 return -1;
4039         }
4040
4041         ctdb->ctdbd_pid = getpid();
4042
4043         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4044         if (ctdb->recoverd_pid == -1) {
4045                 return -1;
4046         }
4047
4048         if (ctdb->recoverd_pid != 0) {
4049                 talloc_free(ctdb->recd_ctx);
4050                 ctdb->recd_ctx = talloc_new(ctdb);
4051                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4052
4053                 close(fd[0]);
4054                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4055                                 timeval_current_ofs(30, 0),
4056                                 ctdb_check_recd, ctdb);
4057                 return 0;
4058         }
4059
4060         close(fd[1]);
4061
4062         srandom(getpid() ^ time(NULL));
4063
4064         /* Clear the log ringbuffer */
4065         ctdb_clear_log(ctdb);
4066
4067         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4068                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4069                 exit(1);
4070         }
4071
4072         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4073
4074         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4075                      ctdb_recoverd_parent, &fd[0]);
4076         tevent_fd_set_auto_close(fde);
4077
4078         /* set up a handler to pick up sigchld */
4079         se = event_add_signal(ctdb->ev, ctdb,
4080                                      SIGCHLD, 0,
4081                                      recd_sig_child_handler,
4082                                      ctdb);
4083         if (se == NULL) {
4084                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4085                 exit(1);
4086         }
4087
4088         monitor_cluster(ctdb);
4089
4090         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4091         return -1;
4092 }
4093
4094 /*
4095   shutdown the recovery daemon
4096  */
4097 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4098 {
4099         if (ctdb->recoverd_pid == 0) {
4100                 return;
4101         }
4102
4103         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4104         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4105
4106         TALLOC_FREE(ctdb->recd_ctx);
4107         TALLOC_FREE(ctdb->recd_ping_count);
4108 }
4109
4110 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4111                        struct timeval t, void *private_data)
4112 {
4113         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4114
4115         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4116         ctdb_stop_recoverd(ctdb);
4117         ctdb_start_recoverd(ctdb);
4118 }