recoverd: Fix an incorrect comment
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         if (!ctdb_validate_pnn(ctdb, pnn)) {
90                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
91                 return;
92         }
93
94         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         /* If we are banned or stopped, do not set other nodes as culprits */
124         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
125                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
126                 return;
127         }
128
129         if (ctdb->nodes[culprit]->ban_state == NULL) {
130                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
131                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
132
133                 
134         }
135         ban_state = ctdb->nodes[culprit]->ban_state;
136         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
137                 /* this was the first time in a long while this node
138                    misbehaved so we will forgive any old transgressions.
139                 */
140                 ban_state->count = 0;
141         }
142
143         ban_state->count += count;
144         ban_state->last_reported_time = timeval_current();
145         rec->last_culprit_node = culprit;
146 }
147
148 /*
149   remember the trouble maker
150  */
151 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
152 {
153         ctdb_set_culprit_count(rec, culprit, 1);
154 }
155
156
157 /* this callback is called for every node that failed to execute the
158    recovered event
159 */
160 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
161 {
162         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
163
164         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
165
166         ctdb_set_culprit(rec, node_pnn);
167 }
168
169 /*
170   run the "recovered" eventscript on all nodes
171  */
172 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
173 {
174         TALLOC_CTX *tmp_ctx;
175         uint32_t *nodes;
176         struct ctdb_context *ctdb = rec->ctdb;
177
178         tmp_ctx = talloc_new(ctdb);
179         CTDB_NO_MEMORY(ctdb, tmp_ctx);
180
181         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
182         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
183                                         nodes, 0,
184                                         CONTROL_TIMEOUT(), false, tdb_null,
185                                         NULL, recovered_fail_callback,
186                                         rec) != 0) {
187                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
188
189                 talloc_free(tmp_ctx);
190                 return -1;
191         }
192
193         talloc_free(tmp_ctx);
194         return 0;
195 }
196
197 /* this callback is called for every node that failed to execute the
198    start recovery event
199 */
200 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
201 {
202         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
203
204         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
205
206         ctdb_set_culprit(rec, node_pnn);
207 }
208
209 /*
210   run the "startrecovery" eventscript on all nodes
211  */
212 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
213 {
214         TALLOC_CTX *tmp_ctx;
215         uint32_t *nodes;
216         struct ctdb_context *ctdb = rec->ctdb;
217
218         tmp_ctx = talloc_new(ctdb);
219         CTDB_NO_MEMORY(ctdb, tmp_ctx);
220
221         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
222         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
223                                         nodes, 0,
224                                         CONTROL_TIMEOUT(), false, tdb_null,
225                                         NULL,
226                                         startrecovery_fail_callback,
227                                         rec) != 0) {
228                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
229                 talloc_free(tmp_ctx);
230                 return -1;
231         }
232
233         talloc_free(tmp_ctx);
234         return 0;
235 }
236
237 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
238 {
239         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
240                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
241                 return;
242         }
243         if (node_pnn < ctdb->num_nodes) {
244                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
245         }
246
247         if (node_pnn == ctdb->pnn) {
248                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
249         }
250 }
251
252 /*
253   update the node capabilities for all connected nodes
254  */
255 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
256 {
257         uint32_t *nodes;
258         TALLOC_CTX *tmp_ctx;
259
260         tmp_ctx = talloc_new(ctdb);
261         CTDB_NO_MEMORY(ctdb, tmp_ctx);
262
263         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
264         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
265                                         nodes, 0,
266                                         CONTROL_TIMEOUT(),
267                                         false, tdb_null,
268                                         async_getcap_callback, NULL,
269                                         NULL) != 0) {
270                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
271                 talloc_free(tmp_ctx);
272                 return -1;
273         }
274
275         talloc_free(tmp_ctx);
276         return 0;
277 }
278
279 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
280 {
281         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
282
283         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
284         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
285 }
286
287 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
288 {
289         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
290
291         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
292         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
293 }
294
295 /*
296   change recovery mode on all nodes
297  */
298 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
299 {
300         TDB_DATA data;
301         uint32_t *nodes;
302         TALLOC_CTX *tmp_ctx;
303
304         tmp_ctx = talloc_new(ctdb);
305         CTDB_NO_MEMORY(ctdb, tmp_ctx);
306
307         /* freeze all nodes */
308         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
309         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
310                 int i;
311
312                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
313                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
314                                                 nodes, i,
315                                                 CONTROL_TIMEOUT(),
316                                                 false, tdb_null,
317                                                 NULL,
318                                                 set_recmode_fail_callback,
319                                                 rec) != 0) {
320                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
321                                 talloc_free(tmp_ctx);
322                                 return -1;
323                         }
324                 }
325         }
326
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&rec_mode;
330
331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
332                                         nodes, 0,
333                                         CONTROL_TIMEOUT(),
334                                         false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /*
347   change recovery master on all node
348  */
349 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
350 {
351         TDB_DATA data;
352         TALLOC_CTX *tmp_ctx;
353         uint32_t *nodes;
354
355         tmp_ctx = talloc_new(ctdb);
356         CTDB_NO_MEMORY(ctdb, tmp_ctx);
357
358         data.dsize = sizeof(uint32_t);
359         data.dptr = (unsigned char *)&pnn;
360
361         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
362         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
363                                         nodes, 0,
364                                         CONTROL_TIMEOUT(), false, data,
365                                         NULL, NULL,
366                                         NULL) != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
368                 talloc_free(tmp_ctx);
369                 return -1;
370         }
371
372         talloc_free(tmp_ctx);
373         return 0;
374 }
375
376 /* update all remote nodes to use the same db priority that we have
377    this can fail if the remove node has not yet been upgraded to 
378    support this function, so we always return success and never fail
379    a recovery if this call fails.
380 */
381 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
382         struct ctdb_node_map *nodemap, 
383         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
384 {
385         int db;
386         uint32_t *nodes;
387
388         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
389
390         /* step through all local databases */
391         for (db=0; db<dbmap->num;db++) {
392                 TDB_DATA data;
393                 struct ctdb_db_priority db_prio;
394                 int ret;
395
396                 db_prio.db_id     = dbmap->dbs[db].dbid;
397                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
398                 if (ret != 0) {
399                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
400                         continue;
401                 }
402
403                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
404
405                 data.dptr  = (uint8_t *)&db_prio;
406                 data.dsize = sizeof(db_prio);
407
408                 if (ctdb_client_async_control(ctdb,
409                                         CTDB_CONTROL_SET_DB_PRIORITY,
410                                         nodes, 0,
411                                         CONTROL_TIMEOUT(), false, data,
412                                         NULL, NULL,
413                                         NULL) != 0) {
414                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
415                 }
416         }
417
418         return 0;
419 }                       
420
421 /*
422   ensure all other nodes have attached to any databases that we have
423  */
424 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
425                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
426 {
427         int i, j, db, ret;
428         struct ctdb_dbid_map *remote_dbmap;
429
430         /* verify that all other nodes have all our databases */
431         for (j=0; j<nodemap->num; j++) {
432                 /* we dont need to ourself ourselves */
433                 if (nodemap->nodes[j].pnn == pnn) {
434                         continue;
435                 }
436                 /* dont check nodes that are unavailable */
437                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
438                         continue;
439                 }
440
441                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                          mem_ctx, &remote_dbmap);
443                 if (ret != 0) {
444                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
445                         return -1;
446                 }
447
448                 /* step through all local databases */
449                 for (db=0; db<dbmap->num;db++) {
450                         const char *name;
451
452
453                         for (i=0;i<remote_dbmap->num;i++) {
454                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
455                                         break;
456                                 }
457                         }
458                         /* the remote node already have this database */
459                         if (i!=remote_dbmap->num) {
460                                 continue;
461                         }
462                         /* ok so we need to create this database */
463                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
464                                             mem_ctx, &name);
465                         if (ret != 0) {
466                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
467                                 return -1;
468                         }
469                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
470                                            mem_ctx, name,
471                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
474                                 return -1;
475                         }
476                 }
477         }
478
479         return 0;
480 }
481
482
483 /*
484   ensure we are attached to any databases that anyone else is attached to
485  */
486 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
487                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
488 {
489         int i, j, db, ret;
490         struct ctdb_dbid_map *remote_dbmap;
491
492         /* verify that we have all database any other node has */
493         for (j=0; j<nodemap->num; j++) {
494                 /* we dont need to ourself ourselves */
495                 if (nodemap->nodes[j].pnn == pnn) {
496                         continue;
497                 }
498                 /* dont check nodes that are unavailable */
499                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
500                         continue;
501                 }
502
503                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
504                                          mem_ctx, &remote_dbmap);
505                 if (ret != 0) {
506                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
507                         return -1;
508                 }
509
510                 /* step through all databases on the remote node */
511                 for (db=0; db<remote_dbmap->num;db++) {
512                         const char *name;
513
514                         for (i=0;i<(*dbmap)->num;i++) {
515                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
516                                         break;
517                                 }
518                         }
519                         /* we already have this db locally */
520                         if (i!=(*dbmap)->num) {
521                                 continue;
522                         }
523                         /* ok so we need to create this database and
524                            rebuild dbmap
525                          */
526                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
527                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
528                         if (ret != 0) {
529                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
530                                           nodemap->nodes[j].pnn));
531                                 return -1;
532                         }
533                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
534                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
535                         if (ret != 0) {
536                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
537                                 return -1;
538                         }
539                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   pull the remote database contents from one node into the recdb
553  */
554 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
555                                     struct tdb_wrap *recdb, uint32_t dbid)
556 {
557         int ret;
558         TDB_DATA outdata;
559         struct ctdb_marshall_buffer *reply;
560         struct ctdb_rec_data *rec;
561         int i;
562         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
563
564         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
565                                CONTROL_TIMEOUT(), &outdata);
566         if (ret != 0) {
567                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
568                 talloc_free(tmp_ctx);
569                 return -1;
570         }
571
572         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
573
574         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
575                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
576                 talloc_free(tmp_ctx);
577                 return -1;
578         }
579         
580         rec = (struct ctdb_rec_data *)&reply->data[0];
581         
582         for (i=0;
583              i<reply->count;
584              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
585                 TDB_DATA key, data;
586                 struct ctdb_ltdb_header *hdr;
587                 TDB_DATA existing;
588                 
589                 key.dptr = &rec->data[0];
590                 key.dsize = rec->keylen;
591                 data.dptr = &rec->data[key.dsize];
592                 data.dsize = rec->datalen;
593                 
594                 hdr = (struct ctdb_ltdb_header *)data.dptr;
595
596                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
597                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
598                         talloc_free(tmp_ctx);
599                         return -1;
600                 }
601
602                 /* fetch the existing record, if any */
603                 existing = tdb_fetch(recdb->tdb, key);
604                 
605                 if (existing.dptr != NULL) {
606                         struct ctdb_ltdb_header header;
607                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
608                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
609                                          (unsigned)existing.dsize, srcnode));
610                                 free(existing.dptr);
611                                 talloc_free(tmp_ctx);
612                                 return -1;
613                         }
614                         header = *(struct ctdb_ltdb_header *)existing.dptr;
615                         free(existing.dptr);
616                         if (!(header.rsn < hdr->rsn ||
617                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
618                                 continue;
619                         }
620                 }
621                 
622                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
623                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
624                         talloc_free(tmp_ctx);
625                         return -1;                              
626                 }
627         }
628
629         talloc_free(tmp_ctx);
630
631         return 0;
632 }
633
634
635 struct pull_seqnum_cbdata {
636         int failed;
637         uint32_t pnn;
638         uint64_t seqnum;
639 };
640
641 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
642 {
643         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
644         uint64_t seqnum;
645
646         if (cb_data->failed != 0) {
647                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
648                 return;
649         }
650
651         if (res != 0) {
652                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
653                 cb_data->failed = 1;
654                 return;
655         }
656
657         if (outdata.dsize != sizeof(uint64_t)) {
658                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
659                 cb_data->failed = -1;
660                 return;
661         }
662
663         seqnum = *((uint64_t *)outdata.dptr);
664
665         if (seqnum > cb_data->seqnum) {
666                 cb_data->seqnum = seqnum;
667                 cb_data->pnn = node_pnn;
668         }
669 }
670
671 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
672 {
673         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
674
675         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
676         cb_data->failed = 1;
677 }
678
679 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
680                                 struct ctdb_recoverd *rec, 
681                                 struct ctdb_node_map *nodemap, 
682                                 struct tdb_wrap *recdb, uint32_t dbid)
683 {
684         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
685         uint32_t *nodes;
686         TDB_DATA data;
687         uint32_t outdata[2];
688         struct pull_seqnum_cbdata *cb_data;
689
690         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
691
692         outdata[0] = dbid;
693         outdata[1] = 0;
694
695         data.dsize = sizeof(outdata);
696         data.dptr  = (uint8_t *)&outdata[0];
697
698         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
699         if (cb_data == NULL) {
700                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
701                 talloc_free(tmp_ctx);
702                 return -1;
703         }
704
705         cb_data->failed = 0;
706         cb_data->pnn    = -1;
707         cb_data->seqnum = 0;
708         
709         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
711                                         nodes, 0,
712                                         CONTROL_TIMEOUT(), false, data,
713                                         pull_seqnum_cb,
714                                         pull_seqnum_fail_cb,
715                                         cb_data) != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
717
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->failed != 0) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
729                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
730                 talloc_free(tmp_ctx);
731                 return -1;
732         }
733
734         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
735
736         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
737                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
738                 talloc_free(tmp_ctx);
739                 return -1;
740         }
741
742         talloc_free(tmp_ctx);
743         return 0;
744 }
745
746
747 /*
748   pull all the remote database contents into the recdb
749  */
750 static int pull_remote_database(struct ctdb_context *ctdb,
751                                 struct ctdb_recoverd *rec, 
752                                 struct ctdb_node_map *nodemap, 
753                                 struct tdb_wrap *recdb, uint32_t dbid,
754                                 bool persistent)
755 {
756         int j;
757
758         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
759                 int ret;
760                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
761                 if (ret == 0) {
762                         return 0;
763                 }
764         }
765
766         /* pull all records from all other nodes across onto this node
767            (this merges based on rsn)
768         */
769         for (j=0; j<nodemap->num; j++) {
770                 /* dont merge from nodes that are unavailable */
771                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
772                         continue;
773                 }
774                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
775                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
776                                  nodemap->nodes[j].pnn));
777                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
778                         return -1;
779                 }
780         }
781         
782         return 0;
783 }
784
785
786 /*
787   update flags on all active nodes
788  */
789 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
790 {
791         int ret;
792
793         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
794                 if (ret != 0) {
795                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
796                 return -1;
797         }
798
799         return 0;
800 }
801
802 /*
803   ensure all nodes have the same vnnmap we do
804  */
805 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
806                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
807 {
808         int j, ret;
809
810         /* push the new vnn map out to all the nodes */
811         for (j=0; j<nodemap->num; j++) {
812                 /* dont push to nodes that are unavailable */
813                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
814                         continue;
815                 }
816
817                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
818                 if (ret != 0) {
819                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
820                         return -1;
821                 }
822         }
823
824         return 0;
825 }
826
827
828 struct vacuum_info {
829         struct vacuum_info *next, *prev;
830         struct ctdb_recoverd *rec;
831         uint32_t srcnode;
832         struct ctdb_db_context *ctdb_db;
833         struct ctdb_marshall_buffer *recs;
834         struct ctdb_rec_data *r;
835 };
836
837 static void vacuum_fetch_next(struct vacuum_info *v);
838
839 /*
840   called when a vacuum fetch has completed - just free it and do the next one
841  */
842 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
843 {
844         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
845         talloc_free(state);
846         vacuum_fetch_next(v);
847 }
848
849
850 /*
851   process the next element from the vacuum list
852 */
853 static void vacuum_fetch_next(struct vacuum_info *v)
854 {
855         struct ctdb_call call;
856         struct ctdb_rec_data *r;
857
858         while (v->recs->count) {
859                 struct ctdb_client_call_state *state;
860                 TDB_DATA data;
861                 struct ctdb_ltdb_header *hdr;
862
863                 ZERO_STRUCT(call);
864                 call.call_id = CTDB_NULL_FUNC;
865                 call.flags = CTDB_IMMEDIATE_MIGRATION;
866                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
867
868                 r = v->r;
869                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
870                 v->recs->count--;
871
872                 call.key.dptr = &r->data[0];
873                 call.key.dsize = r->keylen;
874
875                 /* ensure we don't block this daemon - just skip a record if we can't get
876                    the chainlock */
877                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
878                         continue;
879                 }
880
881                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
882                 if (data.dptr == NULL) {
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886
887                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
888                         free(data.dptr);
889                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
890                         continue;
891                 }
892                 
893                 hdr = (struct ctdb_ltdb_header *)data.dptr;
894                 if (hdr->dmaster == v->rec->ctdb->pnn) {
895                         /* its already local */
896                         free(data.dptr);
897                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
898                         continue;
899                 }
900
901                 free(data.dptr);
902
903                 state = ctdb_call_send(v->ctdb_db, &call);
904                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
905                 if (state == NULL) {
906                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
907                         talloc_free(v);
908                         return;
909                 }
910                 state->async.fn = vacuum_fetch_callback;
911                 state->async.private_data = v;
912                 return;
913         }
914
915         talloc_free(v);
916 }
917
918
919 /*
920   destroy a vacuum info structure
921  */
922 static int vacuum_info_destructor(struct vacuum_info *v)
923 {
924         DLIST_REMOVE(v->rec->vacuum_info, v);
925         return 0;
926 }
927
928
929 /*
930   handler for vacuum fetch
931 */
932 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
933                                  TDB_DATA data, void *private_data)
934 {
935         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
936         struct ctdb_marshall_buffer *recs;
937         int ret, i;
938         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
939         const char *name;
940         struct ctdb_dbid_map *dbmap=NULL;
941         bool persistent = false;
942         struct ctdb_db_context *ctdb_db;
943         struct ctdb_rec_data *r;
944         uint32_t srcnode;
945         struct vacuum_info *v;
946
947         recs = (struct ctdb_marshall_buffer *)data.dptr;
948         r = (struct ctdb_rec_data *)&recs->data[0];
949
950         if (recs->count == 0) {
951                 talloc_free(tmp_ctx);
952                 return;
953         }
954
955         srcnode = r->reqid;
956
957         for (v=rec->vacuum_info;v;v=v->next) {
958                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
959                         /* we're already working on records from this node */
960                         talloc_free(tmp_ctx);
961                         return;
962                 }
963         }
964
965         /* work out if the database is persistent */
966         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
967         if (ret != 0) {
968                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         for (i=0;i<dbmap->num;i++) {
974                 if (dbmap->dbs[i].dbid == recs->db_id) {
975                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
976                         break;
977                 }
978         }
979         if (i == dbmap->num) {
980                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
981                 talloc_free(tmp_ctx);
982                 return;         
983         }
984
985         /* find the name of this database */
986         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
987                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
988                 talloc_free(tmp_ctx);
989                 return;
990         }
991
992         /* attach to it */
993         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
994         if (ctdb_db == NULL) {
995                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
996                 talloc_free(tmp_ctx);
997                 return;
998         }
999
1000         v = talloc_zero(rec, struct vacuum_info);
1001         if (v == NULL) {
1002                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1003                 talloc_free(tmp_ctx);
1004                 return;
1005         }
1006
1007         v->rec = rec;
1008         v->srcnode = srcnode;
1009         v->ctdb_db = ctdb_db;
1010         v->recs = talloc_memdup(v, recs, data.dsize);
1011         if (v->recs == NULL) {
1012                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1013                 talloc_free(v);
1014                 talloc_free(tmp_ctx);
1015                 return;         
1016         }
1017         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1018
1019         DLIST_ADD(rec->vacuum_info, v);
1020
1021         talloc_set_destructor(v, vacuum_info_destructor);
1022
1023         vacuum_fetch_next(v);
1024         talloc_free(tmp_ctx);
1025 }
1026
1027
1028 /*
1029   called when ctdb_wait_timeout should finish
1030  */
1031 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1032                               struct timeval yt, void *p)
1033 {
1034         uint32_t *timed_out = (uint32_t *)p;
1035         (*timed_out) = 1;
1036 }
1037
1038 /*
1039   wait for a given number of seconds
1040  */
1041 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1042 {
1043         uint32_t timed_out = 0;
1044         time_t usecs = (secs - (time_t)secs) * 1000000;
1045         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1046         while (!timed_out) {
1047                 event_loop_once(ctdb->ev);
1048         }
1049 }
1050
1051 /*
1052   called when an election times out (ends)
1053  */
1054 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1055                                   struct timeval t, void *p)
1056 {
1057         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1058         rec->election_timeout = NULL;
1059         fast_start = false;
1060
1061         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1062 }
1063
1064
1065 /*
1066   wait for an election to finish. It finished election_timeout seconds after
1067   the last election packet is received
1068  */
1069 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1070 {
1071         struct ctdb_context *ctdb = rec->ctdb;
1072         while (rec->election_timeout) {
1073                 event_loop_once(ctdb->ev);
1074         }
1075 }
1076
1077 /*
1078   Update our local flags from all remote connected nodes. 
1079   This is only run when we are or we belive we are the recovery master
1080  */
1081 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1082 {
1083         int j;
1084         struct ctdb_context *ctdb = rec->ctdb;
1085         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1086
1087         /* get the nodemap for all active remote nodes and verify
1088            they are the same as for this node
1089          */
1090         for (j=0; j<nodemap->num; j++) {
1091                 struct ctdb_node_map *remote_nodemap=NULL;
1092                 int ret;
1093
1094                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1095                         continue;
1096                 }
1097                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1098                         continue;
1099                 }
1100
1101                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1102                                            mem_ctx, &remote_nodemap);
1103                 if (ret != 0) {
1104                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1105                                   nodemap->nodes[j].pnn));
1106                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1107                         talloc_free(mem_ctx);
1108                         return MONITOR_FAILED;
1109                 }
1110                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1111                         /* We should tell our daemon about this so it
1112                            updates its flags or else we will log the same 
1113                            message again in the next iteration of recovery.
1114                            Since we are the recovery master we can just as
1115                            well update the flags on all nodes.
1116                         */
1117                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1118                         if (ret != 0) {
1119                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1120                                 return -1;
1121                         }
1122
1123                         /* Update our local copy of the flags in the recovery
1124                            daemon.
1125                         */
1126                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1127                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1128                                  nodemap->nodes[j].flags));
1129                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1130                 }
1131                 talloc_free(remote_nodemap);
1132         }
1133         talloc_free(mem_ctx);
1134         return MONITOR_OK;
1135 }
1136
1137
1138 /* Create a new random generation ip. 
1139    The generation id can not be the INVALID_GENERATION id
1140 */
1141 static uint32_t new_generation(void)
1142 {
1143         uint32_t generation;
1144
1145         while (1) {
1146                 generation = random();
1147
1148                 if (generation != INVALID_GENERATION) {
1149                         break;
1150                 }
1151         }
1152
1153         return generation;
1154 }
1155
1156
1157 /*
1158   create a temporary working database
1159  */
1160 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1161 {
1162         char *name;
1163         struct tdb_wrap *recdb;
1164         unsigned tdb_flags;
1165
1166         /* open up the temporary recovery database */
1167         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1168                                ctdb->db_directory_state,
1169                                ctdb->pnn);
1170         if (name == NULL) {
1171                 return NULL;
1172         }
1173         unlink(name);
1174
1175         tdb_flags = TDB_NOLOCK;
1176         if (ctdb->valgrinding) {
1177                 tdb_flags |= TDB_NOMMAP;
1178         }
1179         tdb_flags |= TDB_DISALLOW_NESTING;
1180
1181         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1182                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1183         if (recdb == NULL) {
1184                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1185         }
1186
1187         talloc_free(name);
1188
1189         return recdb;
1190 }
1191
1192
1193 /* 
1194    a traverse function for pulling all relevant records from recdb
1195  */
1196 struct recdb_data {
1197         struct ctdb_context *ctdb;
1198         struct ctdb_marshall_buffer *recdata;
1199         uint32_t len;
1200         uint32_t allocated_len;
1201         bool failed;
1202         bool persistent;
1203 };
1204
1205 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1206 {
1207         struct recdb_data *params = (struct recdb_data *)p;
1208         struct ctdb_rec_data *rec;
1209         struct ctdb_ltdb_header *hdr;
1210
1211         /*
1212          * skip empty records - but NOT for persistent databases:
1213          *
1214          * The record-by-record mode of recovery deletes empty records.
1215          * For persistent databases, this can lead to data corruption
1216          * by deleting records that should be there:
1217          *
1218          * - Assume the cluster has been running for a while.
1219          *
1220          * - A record R in a persistent database has been created and
1221          *   deleted a couple of times, the last operation being deletion,
1222          *   leaving an empty record with a high RSN, say 10.
1223          *
1224          * - Now a node N is turned off.
1225          *
1226          * - This leaves the local database copy of D on N with the empty
1227          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1228          *   the copy of record R.
1229          *
1230          * - Now the record is created again while node N is turned off.
1231          *   This creates R with RSN = 1 on all nodes except for N.
1232          *
1233          * - Now node N is turned on again. The following recovery will chose
1234          *   the older empty copy of R due to RSN 10 > RSN 1.
1235          *
1236          * ==> Hence the record is gone after the recovery.
1237          *
1238          * On databases like Samba's registry, this can damage the higher-level
1239          * data structures built from the various tdb-level records.
1240          */
1241         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1242                 return 0;
1243         }
1244
1245         /* update the dmaster field to point to us */
1246         hdr = (struct ctdb_ltdb_header *)data.dptr;
1247         if (!params->persistent) {
1248                 hdr->dmaster = params->ctdb->pnn;
1249                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1250         }
1251
1252         /* add the record to the blob ready to send to the nodes */
1253         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1254         if (rec == NULL) {
1255                 params->failed = true;
1256                 return -1;
1257         }
1258         if (params->len + rec->length >= params->allocated_len) {
1259                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1260                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1261         }
1262         if (params->recdata == NULL) {
1263                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1264                          rec->length + params->len, params->recdata->count));
1265                 params->failed = true;
1266                 return -1;
1267         }
1268         params->recdata->count++;
1269         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1270         params->len += rec->length;
1271         talloc_free(rec);
1272
1273         return 0;
1274 }
1275
1276 /*
1277   push the recdb database out to all nodes
1278  */
1279 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1280                                bool persistent,
1281                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1282 {
1283         struct recdb_data params;
1284         struct ctdb_marshall_buffer *recdata;
1285         TDB_DATA outdata;
1286         TALLOC_CTX *tmp_ctx;
1287         uint32_t *nodes;
1288
1289         tmp_ctx = talloc_new(ctdb);
1290         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1291
1292         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1293         CTDB_NO_MEMORY(ctdb, recdata);
1294
1295         recdata->db_id = dbid;
1296
1297         params.ctdb = ctdb;
1298         params.recdata = recdata;
1299         params.len = offsetof(struct ctdb_marshall_buffer, data);
1300         params.allocated_len = params.len;
1301         params.failed = false;
1302         params.persistent = persistent;
1303
1304         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1305                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1306                 talloc_free(params.recdata);
1307                 talloc_free(tmp_ctx);
1308                 return -1;
1309         }
1310
1311         if (params.failed) {
1312                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1313                 talloc_free(params.recdata);
1314                 talloc_free(tmp_ctx);
1315                 return -1;              
1316         }
1317
1318         recdata = params.recdata;
1319
1320         outdata.dptr = (void *)recdata;
1321         outdata.dsize = params.len;
1322
1323         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1324         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1325                                         nodes, 0,
1326                                         CONTROL_TIMEOUT(), false, outdata,
1327                                         NULL, NULL,
1328                                         NULL) != 0) {
1329                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1330                 talloc_free(recdata);
1331                 talloc_free(tmp_ctx);
1332                 return -1;
1333         }
1334
1335         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1336                   dbid, recdata->count));
1337
1338         talloc_free(recdata);
1339         talloc_free(tmp_ctx);
1340
1341         return 0;
1342 }
1343
1344
1345 /*
1346   go through a full recovery on one database 
1347  */
1348 static int recover_database(struct ctdb_recoverd *rec, 
1349                             TALLOC_CTX *mem_ctx,
1350                             uint32_t dbid,
1351                             bool persistent,
1352                             uint32_t pnn, 
1353                             struct ctdb_node_map *nodemap,
1354                             uint32_t transaction_id)
1355 {
1356         struct tdb_wrap *recdb;
1357         int ret;
1358         struct ctdb_context *ctdb = rec->ctdb;
1359         TDB_DATA data;
1360         struct ctdb_control_wipe_database w;
1361         uint32_t *nodes;
1362
1363         recdb = create_recdb(ctdb, mem_ctx);
1364         if (recdb == NULL) {
1365                 return -1;
1366         }
1367
1368         /* pull all remote databases onto the recdb */
1369         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1370         if (ret != 0) {
1371                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1372                 return -1;
1373         }
1374
1375         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1376
1377         /* wipe all the remote databases. This is safe as we are in a transaction */
1378         w.db_id = dbid;
1379         w.transaction_id = transaction_id;
1380
1381         data.dptr = (void *)&w;
1382         data.dsize = sizeof(w);
1383
1384         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1386                                         nodes, 0,
1387                                         CONTROL_TIMEOUT(), false, data,
1388                                         NULL, NULL,
1389                                         NULL) != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1391                 talloc_free(recdb);
1392                 return -1;
1393         }
1394         
1395         /* push out the correct database. This sets the dmaster and skips 
1396            the empty records */
1397         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1398         if (ret != 0) {
1399                 talloc_free(recdb);
1400                 return -1;
1401         }
1402
1403         /* all done with this database */
1404         talloc_free(recdb);
1405
1406         return 0;
1407 }
1408
1409 /*
1410   reload the nodes file 
1411 */
1412 static void reload_nodes_file(struct ctdb_context *ctdb)
1413 {
1414         ctdb->nodes = NULL;
1415         ctdb_load_nodes_file(ctdb);
1416 }
1417
1418 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1419                                          struct ctdb_recoverd *rec,
1420                                          struct ctdb_node_map *nodemap,
1421                                          uint32_t *culprit)
1422 {
1423         int j;
1424         int ret;
1425
1426         if (ctdb->num_nodes != nodemap->num) {
1427                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1428                                   ctdb->num_nodes, nodemap->num));
1429                 if (culprit) {
1430                         *culprit = ctdb->pnn;
1431                 }
1432                 return -1;
1433         }
1434
1435         for (j=0; j<nodemap->num; j++) {
1436                 /* release any existing data */
1437                 if (ctdb->nodes[j]->known_public_ips) {
1438                         talloc_free(ctdb->nodes[j]->known_public_ips);
1439                         ctdb->nodes[j]->known_public_ips = NULL;
1440                 }
1441                 if (ctdb->nodes[j]->available_public_ips) {
1442                         talloc_free(ctdb->nodes[j]->available_public_ips);
1443                         ctdb->nodes[j]->available_public_ips = NULL;
1444                 }
1445
1446                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1447                         continue;
1448                 }
1449
1450                 /* grab a new shiny list of public ips from the node */
1451                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1452                                         CONTROL_TIMEOUT(),
1453                                         ctdb->nodes[j]->pnn,
1454                                         ctdb->nodes,
1455                                         0,
1456                                         &ctdb->nodes[j]->known_public_ips);
1457                 if (ret != 0) {
1458                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1459                                 ctdb->nodes[j]->pnn));
1460                         if (culprit) {
1461                                 *culprit = ctdb->nodes[j]->pnn;
1462                         }
1463                         return -1;
1464                 }
1465
1466                 if (ctdb->do_checkpublicip) {
1467                         if (rec->ip_check_disable_ctx == NULL) {
1468                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1469                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1470                                         rec->need_takeover_run = true;
1471                                 }
1472                         }
1473                 }
1474
1475                 /* grab a new shiny list of public ips from the node */
1476                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1477                                         CONTROL_TIMEOUT(),
1478                                         ctdb->nodes[j]->pnn,
1479                                         ctdb->nodes,
1480                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1481                                         &ctdb->nodes[j]->available_public_ips);
1482                 if (ret != 0) {
1483                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1484                                 ctdb->nodes[j]->pnn));
1485                         if (culprit) {
1486                                 *culprit = ctdb->nodes[j]->pnn;
1487                         }
1488                         return -1;
1489                 }
1490         }
1491
1492         return 0;
1493 }
1494
1495 /* when we start a recovery, make sure all nodes use the same reclock file
1496    setting
1497 */
1498 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1499 {
1500         struct ctdb_context *ctdb = rec->ctdb;
1501         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1502         TDB_DATA data;
1503         uint32_t *nodes;
1504
1505         if (ctdb->recovery_lock_file == NULL) {
1506                 data.dptr  = NULL;
1507                 data.dsize = 0;
1508         } else {
1509                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1510                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1511         }
1512
1513         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1514         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1515                                         nodes, 0,
1516                                         CONTROL_TIMEOUT(),
1517                                         false, data,
1518                                         NULL, NULL,
1519                                         rec) != 0) {
1520                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1521                 talloc_free(tmp_ctx);
1522                 return -1;
1523         }
1524
1525         talloc_free(tmp_ctx);
1526         return 0;
1527 }
1528
1529
1530 /*
1531  * this callback is called for every node that failed to execute ctdb_takeover_run()
1532  * and set flag to re-run takeover run.
1533  */
1534 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1535 {
1536         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1537
1538         if (callback_data != NULL) {
1539                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1540
1541                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1542
1543                 ctdb_set_culprit(rec, node_pnn);
1544                 rec->need_takeover_run = true;
1545         }
1546 }
1547
1548
1549 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1550 {
1551         struct ctdb_context *ctdb = rec->ctdb;
1552         int i;
1553         struct ctdb_banning_state *ban_state;
1554
1555         *self_ban = false;
1556         for (i=0; i<ctdb->num_nodes; i++) {
1557                 if (ctdb->nodes[i]->ban_state == NULL) {
1558                         continue;
1559                 }
1560                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1561                 if (ban_state->count < 2*ctdb->num_nodes) {
1562                         continue;
1563                 }
1564
1565                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1566                         ctdb->nodes[i]->pnn, ban_state->count,
1567                         ctdb->tunable.recovery_ban_period));
1568                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1569                 ban_state->count = 0;
1570
1571                 /* Banning ourself? */
1572                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1573                         *self_ban = true;
1574                 }
1575         }
1576 }
1577
1578
1579 /*
1580   we are the recmaster, and recovery is needed - start a recovery run
1581  */
1582 static int do_recovery(struct ctdb_recoverd *rec, 
1583                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1584                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1585 {
1586         struct ctdb_context *ctdb = rec->ctdb;
1587         int i, j, ret;
1588         uint32_t generation;
1589         struct ctdb_dbid_map *dbmap;
1590         TDB_DATA data;
1591         uint32_t *nodes;
1592         struct timeval start_time;
1593         uint32_t culprit = (uint32_t)-1;
1594         bool self_ban;
1595
1596         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1597
1598         /* if recovery fails, force it again */
1599         rec->need_recovery = true;
1600
1601         ban_misbehaving_nodes(rec, &self_ban);
1602         if (self_ban) {
1603                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1604                 return -1;
1605         }
1606
1607         if (ctdb->tunable.verify_recovery_lock != 0) {
1608                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1609                 start_time = timeval_current();
1610                 if (!ctdb_recovery_lock(ctdb, true)) {
1611                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1612                                          "and ban ourself for %u seconds\n",
1613                                          ctdb->tunable.recovery_ban_period));
1614                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1615                         return -1;
1616                 }
1617                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1618                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1619         }
1620
1621         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1622
1623         /* get a list of all databases */
1624         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1625         if (ret != 0) {
1626                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1627                 return -1;
1628         }
1629
1630         /* we do the db creation before we set the recovery mode, so the freeze happens
1631            on all databases we will be dealing with. */
1632
1633         /* verify that we have all the databases any other node has */
1634         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1635         if (ret != 0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1637                 return -1;
1638         }
1639
1640         /* verify that all other nodes have all our databases */
1641         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1642         if (ret != 0) {
1643                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1644                 return -1;
1645         }
1646         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1647
1648         /* update the database priority for all remote databases */
1649         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1652         }
1653         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1654
1655
1656         /* update all other nodes to use the same setting for reclock files
1657            as the local recovery master.
1658         */
1659         sync_recovery_lock_file_across_cluster(rec);
1660
1661         /* set recovery mode to active on all nodes */
1662         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1663         if (ret != 0) {
1664                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1665                 return -1;
1666         }
1667
1668         /* execute the "startrecovery" event script on all nodes */
1669         ret = run_startrecovery_eventscript(rec, nodemap);
1670         if (ret!=0) {
1671                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1672                 return -1;
1673         }
1674
1675         /*
1676           update all nodes to have the same flags that we have
1677          */
1678         for (i=0;i<nodemap->num;i++) {
1679                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1680                         continue;
1681                 }
1682
1683                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1684                 if (ret != 0) {
1685                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1686                         return -1;
1687                 }
1688         }
1689
1690         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1691
1692         /* pick a new generation number */
1693         generation = new_generation();
1694
1695         /* change the vnnmap on this node to use the new generation 
1696            number but not on any other nodes.
1697            this guarantees that if we abort the recovery prematurely
1698            for some reason (a node stops responding?)
1699            that we can just return immediately and we will reenter
1700            recovery shortly again.
1701            I.e. we deliberately leave the cluster with an inconsistent
1702            generation id to allow us to abort recovery at any stage and
1703            just restart it from scratch.
1704          */
1705         vnnmap->generation = generation;
1706         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1707         if (ret != 0) {
1708                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1709                 return -1;
1710         }
1711
1712         data.dptr = (void *)&generation;
1713         data.dsize = sizeof(uint32_t);
1714
1715         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1716         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1717                                         nodes, 0,
1718                                         CONTROL_TIMEOUT(), false, data,
1719                                         NULL,
1720                                         transaction_start_fail_callback,
1721                                         rec) != 0) {
1722                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1723                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1724                                         nodes, 0,
1725                                         CONTROL_TIMEOUT(), false, tdb_null,
1726                                         NULL,
1727                                         NULL,
1728                                         NULL) != 0) {
1729                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1730                 }
1731                 return -1;
1732         }
1733
1734         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1735
1736         for (i=0;i<dbmap->num;i++) {
1737                 ret = recover_database(rec, mem_ctx,
1738                                        dbmap->dbs[i].dbid,
1739                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1740                                        pnn, nodemap, generation);
1741                 if (ret != 0) {
1742                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1743                         return -1;
1744                 }
1745         }
1746
1747         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1748
1749         /* commit all the changes */
1750         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1751                                         nodes, 0,
1752                                         CONTROL_TIMEOUT(), false, data,
1753                                         NULL, NULL,
1754                                         NULL) != 0) {
1755                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1756                 return -1;
1757         }
1758
1759         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1760         
1761
1762         /* update the capabilities for all nodes */
1763         ret = update_capabilities(ctdb, nodemap);
1764         if (ret!=0) {
1765                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1766                 return -1;
1767         }
1768
1769         /* build a new vnn map with all the currently active and
1770            unbanned nodes */
1771         generation = new_generation();
1772         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1773         CTDB_NO_MEMORY(ctdb, vnnmap);
1774         vnnmap->generation = generation;
1775         vnnmap->size = 0;
1776         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1777         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1778         for (i=j=0;i<nodemap->num;i++) {
1779                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1780                         continue;
1781                 }
1782                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1783                         /* this node can not be an lmaster */
1784                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1785                         continue;
1786                 }
1787
1788                 vnnmap->size++;
1789                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1790                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1791                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1792
1793         }
1794         if (vnnmap->size == 0) {
1795                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1796                 vnnmap->size++;
1797                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1798                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1799                 vnnmap->map[0] = pnn;
1800         }       
1801
1802         /* update to the new vnnmap on all nodes */
1803         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1804         if (ret != 0) {
1805                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1806                 return -1;
1807         }
1808
1809         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1810
1811         /* update recmaster to point to us for all nodes */
1812         ret = set_recovery_master(ctdb, nodemap, pnn);
1813         if (ret!=0) {
1814                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1815                 return -1;
1816         }
1817
1818         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1819
1820         /*
1821           update all nodes to have the same flags that we have
1822          */
1823         for (i=0;i<nodemap->num;i++) {
1824                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1825                         continue;
1826                 }
1827
1828                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1829                 if (ret != 0) {
1830                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1831                         return -1;
1832                 }
1833         }
1834
1835         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1836
1837         /* disable recovery mode */
1838         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1839         if (ret != 0) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1841                 return -1;
1842         }
1843
1844         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1845
1846         /* Fetch known/available public IPs from each active node */
1847         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1848         if (ret != 0) {
1849                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1850                                  culprit));
1851                 rec->need_takeover_run = true;
1852                 return -1;
1853         }
1854         rec->need_takeover_run = false;
1855         ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, NULL);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1858                 rec->need_takeover_run = true;
1859         }
1860
1861         /* execute the "recovered" event script on all nodes */
1862         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1863         if (ret!=0) {
1864                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1865                 return -1;
1866         }
1867
1868         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1869
1870         /* send a message to all clients telling them that the cluster 
1871            has been reconfigured */
1872         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1873
1874         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1875
1876         rec->need_recovery = false;
1877
1878         /* we managed to complete a full recovery, make sure to forgive
1879            any past sins by the nodes that could now participate in the
1880            recovery.
1881         */
1882         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1883         for (i=0;i<nodemap->num;i++) {
1884                 struct ctdb_banning_state *ban_state;
1885
1886                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1887                         continue;
1888                 }
1889
1890                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1891                 if (ban_state == NULL) {
1892                         continue;
1893                 }
1894
1895                 ban_state->count = 0;
1896         }
1897
1898
1899         /* We just finished a recovery successfully. 
1900            We now wait for rerecovery_timeout before we allow 
1901            another recovery to take place.
1902         */
1903         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1904         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1905         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1906
1907         return 0;
1908 }
1909
1910
1911 /*
1912   elections are won by first checking the number of connected nodes, then
1913   the priority time, then the pnn
1914  */
1915 struct election_message {
1916         uint32_t num_connected;
1917         struct timeval priority_time;
1918         uint32_t pnn;
1919         uint32_t node_flags;
1920 };
1921
1922 /*
1923   form this nodes election data
1924  */
1925 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1926 {
1927         int ret, i;
1928         struct ctdb_node_map *nodemap;
1929         struct ctdb_context *ctdb = rec->ctdb;
1930
1931         ZERO_STRUCTP(em);
1932
1933         em->pnn = rec->ctdb->pnn;
1934         em->priority_time = rec->priority_time;
1935
1936         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1937         if (ret != 0) {
1938                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1939                 return;
1940         }
1941
1942         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1943         em->node_flags = rec->node_flags;
1944
1945         for (i=0;i<nodemap->num;i++) {
1946                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1947                         em->num_connected++;
1948                 }
1949         }
1950
1951         /* we shouldnt try to win this election if we cant be a recmaster */
1952         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1953                 em->num_connected = 0;
1954                 em->priority_time = timeval_current();
1955         }
1956
1957         talloc_free(nodemap);
1958 }
1959
1960 /*
1961   see if the given election data wins
1962  */
1963 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1964 {
1965         struct election_message myem;
1966         int cmp = 0;
1967
1968         ctdb_election_data(rec, &myem);
1969
1970         /* we cant win if we dont have the recmaster capability */
1971         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1972                 return false;
1973         }
1974
1975         /* we cant win if we are banned */
1976         if (rec->node_flags & NODE_FLAGS_BANNED) {
1977                 return false;
1978         }
1979
1980         /* we cant win if we are stopped */
1981         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1982                 return false;
1983         }
1984
1985         /* we will automatically win if the other node is banned */
1986         if (em->node_flags & NODE_FLAGS_BANNED) {
1987                 return true;
1988         }
1989
1990         /* we will automatically win if the other node is banned */
1991         if (em->node_flags & NODE_FLAGS_STOPPED) {
1992                 return true;
1993         }
1994
1995         /* try to use the most connected node */
1996         if (cmp == 0) {
1997                 cmp = (int)myem.num_connected - (int)em->num_connected;
1998         }
1999
2000         /* then the longest running node */
2001         if (cmp == 0) {
2002                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2003         }
2004
2005         if (cmp == 0) {
2006                 cmp = (int)myem.pnn - (int)em->pnn;
2007         }
2008
2009         return cmp > 0;
2010 }
2011
2012 /*
2013   send out an election request
2014  */
2015 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2016 {
2017         int ret;
2018         TDB_DATA election_data;
2019         struct election_message emsg;
2020         uint64_t srvid;
2021         struct ctdb_context *ctdb = rec->ctdb;
2022
2023         srvid = CTDB_SRVID_RECOVERY;
2024
2025         ctdb_election_data(rec, &emsg);
2026
2027         election_data.dsize = sizeof(struct election_message);
2028         election_data.dptr  = (unsigned char *)&emsg;
2029
2030
2031         /* send an election message to all active nodes */
2032         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2033         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2034
2035
2036         /* A new node that is already frozen has entered the cluster.
2037            The existing nodes are not frozen and dont need to be frozen
2038            until the election has ended and we start the actual recovery
2039         */
2040         if (update_recmaster == true) {
2041                 /* first we assume we will win the election and set 
2042                    recoverymaster to be ourself on the current node
2043                  */
2044                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2045                 if (ret != 0) {
2046                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2047                         return -1;
2048                 }
2049         }
2050
2051
2052         return 0;
2053 }
2054
2055 /*
2056   this function will unban all nodes in the cluster
2057 */
2058 static void unban_all_nodes(struct ctdb_context *ctdb)
2059 {
2060         int ret, i;
2061         struct ctdb_node_map *nodemap;
2062         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2063         
2064         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2065         if (ret != 0) {
2066                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2067                 return;
2068         }
2069
2070         for (i=0;i<nodemap->num;i++) {
2071                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2072                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2073                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2074                 }
2075         }
2076
2077         talloc_free(tmp_ctx);
2078 }
2079
2080
2081 /*
2082   we think we are winning the election - send a broadcast election request
2083  */
2084 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2085 {
2086         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2087         int ret;
2088
2089         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2090         if (ret != 0) {
2091                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2092         }
2093
2094         talloc_free(rec->send_election_te);
2095         rec->send_election_te = NULL;
2096 }
2097
2098 /*
2099   handler for memory dumps
2100 */
2101 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2102                              TDB_DATA data, void *private_data)
2103 {
2104         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2105         TDB_DATA *dump;
2106         int ret;
2107         struct rd_memdump_reply *rd;
2108
2109         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2110                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2111                 talloc_free(tmp_ctx);
2112                 return;
2113         }
2114         rd = (struct rd_memdump_reply *)data.dptr;
2115
2116         dump = talloc_zero(tmp_ctx, TDB_DATA);
2117         if (dump == NULL) {
2118                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2119                 talloc_free(tmp_ctx);
2120                 return;
2121         }
2122         ret = ctdb_dump_memory(ctdb, dump);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2125                 talloc_free(tmp_ctx);
2126                 return;
2127         }
2128
2129 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2130
2131         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2132         if (ret != 0) {
2133                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2134                 talloc_free(tmp_ctx);
2135                 return;
2136         }
2137
2138         talloc_free(tmp_ctx);
2139 }
2140
2141 /*
2142   handler for getlog
2143 */
2144 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2145                            TDB_DATA data, void *private_data)
2146 {
2147         struct ctdb_get_log_addr *log_addr;
2148         pid_t child;
2149
2150         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2151                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2152                 return;
2153         }
2154         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2155
2156         child = ctdb_fork_no_free_ringbuffer(ctdb);
2157         if (child == (pid_t)-1) {
2158                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2159                 return;
2160         }
2161
2162         if (child == 0) {
2163                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2164                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2165                         _exit(1);
2166                 }
2167                 ctdb_collect_log(ctdb, log_addr);
2168                 _exit(0);
2169         }
2170 }
2171
2172 /*
2173   handler for clearlog
2174 */
2175 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2176                              TDB_DATA data, void *private_data)
2177 {
2178         ctdb_clear_log(ctdb);
2179 }
2180
2181 /*
2182   handler for reload_nodes
2183 */
2184 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2185                              TDB_DATA data, void *private_data)
2186 {
2187         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2188
2189         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2190
2191         reload_nodes_file(rec->ctdb);
2192 }
2193
2194
2195 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2196                               struct timeval yt, void *p)
2197 {
2198         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2199
2200         talloc_free(rec->ip_check_disable_ctx);
2201         rec->ip_check_disable_ctx = NULL;
2202 }
2203
2204
2205 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2206                                   struct timeval t, void *p)
2207 {
2208         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2209         struct ctdb_context *ctdb = rec->ctdb;
2210         int ret;
2211
2212         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2213
2214         ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2215         if (ret != 0) {
2216                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2217                 rec->need_takeover_run = true;
2218         }
2219
2220         talloc_free(rec->deferred_rebalance_ctx);
2221         rec->deferred_rebalance_ctx = NULL;
2222 }
2223
2224         
2225 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2226                              TDB_DATA data, void *private_data)
2227 {
2228         uint32_t pnn;
2229         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2230
2231         if (data.dsize != sizeof(uint32_t)) {
2232                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2233                 return;
2234         }
2235
2236         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2237                 return;
2238         }
2239
2240         pnn = *(uint32_t *)&data.dptr[0];
2241
2242         lcp2_forcerebalance(ctdb, pnn);
2243         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2244
2245         if (rec->deferred_rebalance_ctx != NULL) {
2246                 talloc_free(rec->deferred_rebalance_ctx);
2247         }
2248         rec->deferred_rebalance_ctx = talloc_new(rec);
2249         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2250                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2251                         ctdb_rebalance_timeout, rec);
2252 }
2253
2254
2255
2256 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2257                              TDB_DATA data, void *private_data)
2258 {
2259         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2260         struct ctdb_public_ip *ip;
2261
2262         if (rec->recmaster != rec->ctdb->pnn) {
2263                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2264                 return;
2265         }
2266
2267         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2268                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2269                 return;
2270         }
2271
2272         ip = (struct ctdb_public_ip *)data.dptr;
2273
2274         update_ip_assignment_tree(rec->ctdb, ip);
2275 }
2276
2277
2278 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2279                              TDB_DATA data, void *private_data)
2280 {
2281         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2282         uint32_t timeout;
2283
2284         if (rec->ip_check_disable_ctx != NULL) {
2285                 talloc_free(rec->ip_check_disable_ctx);
2286                 rec->ip_check_disable_ctx = NULL;
2287         }
2288
2289         if (data.dsize != sizeof(uint32_t)) {
2290                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2291                                  "expexting %lu\n", (long unsigned)data.dsize,
2292                                  (long unsigned)sizeof(uint32_t)));
2293                 return;
2294         }
2295         if (data.dptr == NULL) {
2296                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2297                 return;
2298         }
2299
2300         timeout = *((uint32_t *)data.dptr);
2301
2302         if (timeout == 0) {
2303                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2304                 return;
2305         }
2306                 
2307         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2308
2309         rec->ip_check_disable_ctx = talloc_new(rec);
2310         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2311
2312         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2313 }
2314
2315
2316 /*
2317   handler for reload all ips.
2318 */
2319 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2320                              TDB_DATA data, void *private_data)
2321 {
2322         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2323
2324         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2325                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2326                 return;
2327         }
2328
2329         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2330
2331         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2332         return;
2333 }
2334
2335 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2336 {
2337         uint32_t *status = callback_data;
2338
2339         if (res != 0) {
2340                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2341                 *status = 1;
2342         }
2343 }
2344
2345 static int
2346 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2347 {
2348         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2349         uint32_t *nodes;
2350         uint32_t status;
2351         int i;
2352
2353         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2354         for (i = 0; i< nodemap->num; i++) {
2355                 if (nodemap->nodes[i].flags != 0) {
2356                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2357                         talloc_free(tmp_ctx);
2358                         return -1;
2359                 }
2360         }
2361
2362         /* send the flags update to all connected nodes */
2363         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2364         status = 0;
2365         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2366                                         nodes, 0,
2367                                         CONTROL_TIMEOUT(),
2368                                         false, tdb_null,
2369                                         async_reloadips_callback, NULL,
2370                                         &status) != 0) {
2371                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2372                 talloc_free(tmp_ctx);
2373                 return -1;
2374         }
2375
2376         if (status != 0) {
2377                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2378                 talloc_free(tmp_ctx);
2379                 return -1;
2380         }
2381
2382         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2383
2384         talloc_free(tmp_ctx);
2385         return 0;
2386 }
2387
2388
2389 /*
2390   handler for ip reallocate, just add it to the list of callers and 
2391   handle this later in the monitor_cluster loop so we do not recurse
2392   with other callers to takeover_run()
2393 */
2394 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2395                              TDB_DATA data, void *private_data)
2396 {
2397         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2398         struct ip_reallocate_list *caller;
2399
2400         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2401                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2402                 return;
2403         }
2404
2405         if (rec->ip_reallocate_ctx == NULL) {
2406                 rec->ip_reallocate_ctx = talloc_new(rec);
2407                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2408         }
2409
2410         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2411         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2412
2413         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2414         caller->next = rec->reallocate_callers;
2415         rec->reallocate_callers = caller;
2416
2417         return;
2418 }
2419
2420 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2421 {
2422         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2423         TDB_DATA result;
2424         int32_t ret;
2425         struct ip_reallocate_list *callers;
2426         uint32_t culprit;
2427
2428         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2429
2430         /* update the list of public ips that a node can handle for
2431            all connected nodes
2432         */
2433         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2434         if (ret != 0) {
2435                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2436                                  culprit));
2437                 rec->need_takeover_run = true;
2438         }
2439         if (ret == 0) {
2440                 ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2441                 if (ret != 0) {
2442                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2443                         rec->need_takeover_run = true;
2444                 }
2445         }
2446
2447         result.dsize = sizeof(int32_t);
2448         result.dptr  = (uint8_t *)&ret;
2449
2450         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2451
2452                 /* Someone that sent srvid==0 does not want a reply */
2453                 if (callers->rd->srvid == 0) {
2454                         continue;
2455                 }
2456                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2457                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2458                                   (unsigned long long)callers->rd->srvid));
2459                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2460                 if (ret != 0) {
2461                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2462                                          "message to %u:%llu\n",
2463                                          (unsigned)callers->rd->pnn,
2464                                          (unsigned long long)callers->rd->srvid));
2465                 }
2466         }
2467
2468         talloc_free(tmp_ctx);
2469         talloc_free(rec->ip_reallocate_ctx);
2470         rec->ip_reallocate_ctx = NULL;
2471         rec->reallocate_callers = NULL;
2472         
2473 }
2474
2475
2476 /*
2477   handler for recovery master elections
2478 */
2479 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2480                              TDB_DATA data, void *private_data)
2481 {
2482         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2483         int ret;
2484         struct election_message *em = (struct election_message *)data.dptr;
2485         TALLOC_CTX *mem_ctx;
2486
2487         /* we got an election packet - update the timeout for the election */
2488         talloc_free(rec->election_timeout);
2489         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2490                                                 fast_start ?
2491                                                 timeval_current_ofs(0, 500000) :
2492                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2493                                                 ctdb_election_timeout, rec);
2494
2495         mem_ctx = talloc_new(ctdb);
2496
2497         /* someone called an election. check their election data
2498            and if we disagree and we would rather be the elected node, 
2499            send a new election message to all other nodes
2500          */
2501         if (ctdb_election_win(rec, em)) {
2502                 if (!rec->send_election_te) {
2503                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2504                                                                 timeval_current_ofs(0, 500000),
2505                                                                 election_send_request, rec);
2506                 }
2507                 talloc_free(mem_ctx);
2508                 /*unban_all_nodes(ctdb);*/
2509                 return;
2510         }
2511         
2512         /* we didn't win */
2513         talloc_free(rec->send_election_te);
2514         rec->send_election_te = NULL;
2515
2516         if (ctdb->tunable.verify_recovery_lock != 0) {
2517                 /* release the recmaster lock */
2518                 if (em->pnn != ctdb->pnn &&
2519                     ctdb->recovery_lock_fd != -1) {
2520                         close(ctdb->recovery_lock_fd);
2521                         ctdb->recovery_lock_fd = -1;
2522                         unban_all_nodes(ctdb);
2523                 }
2524         }
2525
2526         /* ok, let that guy become recmaster then */
2527         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2528         if (ret != 0) {
2529                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2530                 talloc_free(mem_ctx);
2531                 return;
2532         }
2533
2534         talloc_free(mem_ctx);
2535         return;
2536 }
2537
2538
2539 /*
2540   force the start of the election process
2541  */
2542 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2543                            struct ctdb_node_map *nodemap)
2544 {
2545         int ret;
2546         struct ctdb_context *ctdb = rec->ctdb;
2547
2548         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2549
2550         /* set all nodes to recovery mode to stop all internode traffic */
2551         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2552         if (ret != 0) {
2553                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2554                 return;
2555         }
2556
2557         talloc_free(rec->election_timeout);
2558         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2559                                                 fast_start ?
2560                                                 timeval_current_ofs(0, 500000) :
2561                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2562                                                 ctdb_election_timeout, rec);
2563
2564         ret = send_election_request(rec, pnn, true);
2565         if (ret!=0) {
2566                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2567                 return;
2568         }
2569
2570         /* wait for a few seconds to collect all responses */
2571         ctdb_wait_election(rec);
2572 }
2573
2574
2575
2576 /*
2577   handler for when a node changes its flags
2578 */
2579 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2580                             TDB_DATA data, void *private_data)
2581 {
2582         int ret;
2583         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2584         struct ctdb_node_map *nodemap=NULL;
2585         TALLOC_CTX *tmp_ctx;
2586         int i;
2587         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2588         int disabled_flag_changed;
2589
2590         if (data.dsize != sizeof(*c)) {
2591                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2592                 return;
2593         }
2594
2595         tmp_ctx = talloc_new(ctdb);
2596         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2597
2598         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2599         if (ret != 0) {
2600                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2601                 talloc_free(tmp_ctx);
2602                 return;         
2603         }
2604
2605
2606         for (i=0;i<nodemap->num;i++) {
2607                 if (nodemap->nodes[i].pnn == c->pnn) break;
2608         }
2609
2610         if (i == nodemap->num) {
2611                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2612                 talloc_free(tmp_ctx);
2613                 return;
2614         }
2615
2616         if (nodemap->nodes[i].flags != c->new_flags) {
2617                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2618         }
2619
2620         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2621
2622         nodemap->nodes[i].flags = c->new_flags;
2623
2624         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2625                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2626
2627         if (ret == 0) {
2628                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2629                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2630         }
2631         
2632         if (ret == 0 &&
2633             ctdb->recovery_master == ctdb->pnn &&
2634             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2635                 /* Only do the takeover run if the perm disabled or unhealthy
2636                    flags changed since these will cause an ip failover but not
2637                    a recovery.
2638                    If the node became disconnected or banned this will also
2639                    lead to an ip address failover but that is handled 
2640                    during recovery
2641                 */
2642                 if (disabled_flag_changed) {
2643                         rec->need_takeover_run = true;
2644                 }
2645         }
2646
2647         talloc_free(tmp_ctx);
2648 }
2649
2650 /*
2651   handler for when we need to push out flag changes ot all other nodes
2652 */
2653 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2654                             TDB_DATA data, void *private_data)
2655 {
2656         int ret;
2657         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2658         struct ctdb_node_map *nodemap=NULL;
2659         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2660         uint32_t recmaster;
2661         uint32_t *nodes;
2662
2663         /* find the recovery master */
2664         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2665         if (ret != 0) {
2666                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2667                 talloc_free(tmp_ctx);
2668                 return;
2669         }
2670
2671         /* read the node flags from the recmaster */
2672         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2673         if (ret != 0) {
2674                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2675                 talloc_free(tmp_ctx);
2676                 return;
2677         }
2678         if (c->pnn >= nodemap->num) {
2679                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2680                 talloc_free(tmp_ctx);
2681                 return;
2682         }
2683
2684         /* send the flags update to all connected nodes */
2685         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2686
2687         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2688                                       nodes, 0, CONTROL_TIMEOUT(),
2689                                       false, data,
2690                                       NULL, NULL,
2691                                       NULL) != 0) {
2692                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2693
2694                 talloc_free(tmp_ctx);
2695                 return;
2696         }
2697
2698         talloc_free(tmp_ctx);
2699 }
2700
2701
2702 struct verify_recmode_normal_data {
2703         uint32_t count;
2704         enum monitor_result status;
2705 };
2706
2707 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2708 {
2709         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2710
2711
2712         /* one more node has responded with recmode data*/
2713         rmdata->count--;
2714
2715         /* if we failed to get the recmode, then return an error and let
2716            the main loop try again.
2717         */
2718         if (state->state != CTDB_CONTROL_DONE) {
2719                 if (rmdata->status == MONITOR_OK) {
2720                         rmdata->status = MONITOR_FAILED;
2721                 }
2722                 return;
2723         }
2724
2725         /* if we got a response, then the recmode will be stored in the
2726            status field
2727         */
2728         if (state->status != CTDB_RECOVERY_NORMAL) {
2729                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2730                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2731         }
2732
2733         return;
2734 }
2735
2736
2737 /* verify that all nodes are in normal recovery mode */
2738 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2739 {
2740         struct verify_recmode_normal_data *rmdata;
2741         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2742         struct ctdb_client_control_state *state;
2743         enum monitor_result status;
2744         int j;
2745         
2746         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2747         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2748         rmdata->count  = 0;
2749         rmdata->status = MONITOR_OK;
2750
2751         /* loop over all active nodes and send an async getrecmode call to 
2752            them*/
2753         for (j=0; j<nodemap->num; j++) {
2754                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2755                         continue;
2756                 }
2757                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2758                                         CONTROL_TIMEOUT(), 
2759                                         nodemap->nodes[j].pnn);
2760                 if (state == NULL) {
2761                         /* we failed to send the control, treat this as 
2762                            an error and try again next iteration
2763                         */                      
2764                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2765                         talloc_free(mem_ctx);
2766                         return MONITOR_FAILED;
2767                 }
2768
2769                 /* set up the callback functions */
2770                 state->async.fn = verify_recmode_normal_callback;
2771                 state->async.private_data = rmdata;
2772
2773                 /* one more control to wait for to complete */
2774                 rmdata->count++;
2775         }
2776
2777
2778         /* now wait for up to the maximum number of seconds allowed
2779            or until all nodes we expect a response from has replied
2780         */
2781         while (rmdata->count > 0) {
2782                 event_loop_once(ctdb->ev);
2783         }
2784
2785         status = rmdata->status;
2786         talloc_free(mem_ctx);
2787         return status;
2788 }
2789
2790
2791 struct verify_recmaster_data {
2792         struct ctdb_recoverd *rec;
2793         uint32_t count;
2794         uint32_t pnn;
2795         enum monitor_result status;
2796 };
2797
2798 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2799 {
2800         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2801
2802
2803         /* one more node has responded with recmaster data*/
2804         rmdata->count--;
2805
2806         /* if we failed to get the recmaster, then return an error and let
2807            the main loop try again.
2808         */
2809         if (state->state != CTDB_CONTROL_DONE) {
2810                 if (rmdata->status == MONITOR_OK) {
2811                         rmdata->status = MONITOR_FAILED;
2812                 }
2813                 return;
2814         }
2815
2816         /* if we got a response, then the recmaster will be stored in the
2817            status field
2818         */
2819         if (state->status != rmdata->pnn) {
2820                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2821                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2822                 rmdata->status = MONITOR_ELECTION_NEEDED;
2823         }
2824
2825         return;
2826 }
2827
2828
2829 /* verify that all nodes agree that we are the recmaster */
2830 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2831 {
2832         struct ctdb_context *ctdb = rec->ctdb;
2833         struct verify_recmaster_data *rmdata;
2834         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2835         struct ctdb_client_control_state *state;
2836         enum monitor_result status;
2837         int j;
2838         
2839         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2840         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2841         rmdata->rec    = rec;
2842         rmdata->count  = 0;
2843         rmdata->pnn    = pnn;
2844         rmdata->status = MONITOR_OK;
2845
2846         /* loop over all active nodes and send an async getrecmaster call to 
2847            them*/
2848         for (j=0; j<nodemap->num; j++) {
2849                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2850                         continue;
2851                 }
2852                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2853                                         CONTROL_TIMEOUT(),
2854                                         nodemap->nodes[j].pnn);
2855                 if (state == NULL) {
2856                         /* we failed to send the control, treat this as 
2857                            an error and try again next iteration
2858                         */                      
2859                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2860                         talloc_free(mem_ctx);
2861                         return MONITOR_FAILED;
2862                 }
2863
2864                 /* set up the callback functions */
2865                 state->async.fn = verify_recmaster_callback;
2866                 state->async.private_data = rmdata;
2867
2868                 /* one more control to wait for to complete */
2869                 rmdata->count++;
2870         }
2871
2872
2873         /* now wait for up to the maximum number of seconds allowed
2874            or until all nodes we expect a response from has replied
2875         */
2876         while (rmdata->count > 0) {
2877                 event_loop_once(ctdb->ev);
2878         }
2879
2880         status = rmdata->status;
2881         talloc_free(mem_ctx);
2882         return status;
2883 }
2884
2885 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2886                                     struct ctdb_recoverd *rec)
2887 {
2888         struct ctdb_control_get_ifaces *ifaces = NULL;
2889         TALLOC_CTX *mem_ctx;
2890         bool ret = false;
2891
2892         mem_ctx = talloc_new(NULL);
2893
2894         /* Read the interfaces from the local node */
2895         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2896                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2897                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2898                 /* We could return an error.  However, this will be
2899                  * rare so we'll decide that the interfaces have
2900                  * actually changed, just in case.
2901                  */
2902                 talloc_free(mem_ctx);
2903                 return true;
2904         }
2905
2906         if (!rec->ifaces) {
2907                 /* We haven't been here before so things have changed */
2908                 ret = true;
2909         } else if (rec->ifaces->num != ifaces->num) {
2910                 /* Number of interfaces has changed */
2911                 ret = true;
2912         } else {
2913                 /* See if interface names or link states have changed */
2914                 int i;
2915                 for (i = 0; i < rec->ifaces->num; i++) {
2916                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2917                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0 ||
2918                             iface->link_state != ifaces->ifaces[i].link_state) {
2919                                 ret = true;
2920                                 break;
2921                         }
2922                 }
2923         }
2924
2925         talloc_free(rec->ifaces);
2926         rec->ifaces = talloc_steal(rec, ifaces);
2927
2928         talloc_free(mem_ctx);
2929         return ret;
2930 }
2931
2932 /* called to check that the local allocation of public ip addresses is ok.
2933 */
2934 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2935 {
2936         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2937         struct ctdb_uptime *uptime1 = NULL;
2938         struct ctdb_uptime *uptime2 = NULL;
2939         int ret, j;
2940         bool need_takeover_run = false;
2941
2942         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2943                                 CTDB_CURRENT_NODE, &uptime1);
2944         if (ret != 0) {
2945                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2946                 talloc_free(mem_ctx);
2947                 return -1;
2948         }
2949
2950         if (interfaces_have_changed(ctdb, rec)) {
2951                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2952                                      "local node %u - force takeover run\n",
2953                                      pnn));
2954                 need_takeover_run = true;
2955         }
2956
2957         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2958                                 CTDB_CURRENT_NODE, &uptime2);
2959         if (ret != 0) {
2960                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2961                 talloc_free(mem_ctx);
2962                 return -1;
2963         }
2964
2965         /* skip the check if the startrecovery time has changed */
2966         if (timeval_compare(&uptime1->last_recovery_started,
2967                             &uptime2->last_recovery_started) != 0) {
2968                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2969                 talloc_free(mem_ctx);
2970                 return 0;
2971         }
2972
2973         /* skip the check if the endrecovery time has changed */
2974         if (timeval_compare(&uptime1->last_recovery_finished,
2975                             &uptime2->last_recovery_finished) != 0) {
2976                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2977                 talloc_free(mem_ctx);
2978                 return 0;
2979         }
2980
2981         /* skip the check if we have started but not finished recovery */
2982         if (timeval_compare(&uptime1->last_recovery_finished,
2983                             &uptime1->last_recovery_started) != 1) {
2984                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2985                 talloc_free(mem_ctx);
2986
2987                 return 0;
2988         }
2989
2990         /* verify that we have the ip addresses we should have
2991            and we dont have ones we shouldnt have.
2992            if we find an inconsistency we set recmode to
2993            active on the local node and wait for the recmaster
2994            to do a full blown recovery.
2995            also if the pnn is -1 and we are healthy and can host the ip
2996            we also request a ip reallocation.
2997         */
2998         if (ctdb->tunable.disable_ip_failover == 0) {
2999                 struct ctdb_all_public_ips *ips = NULL;
3000
3001                 /* read the *available* IPs from the local node */
3002                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3003                 if (ret != 0) {
3004                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3005                         talloc_free(mem_ctx);
3006                         return -1;
3007                 }
3008
3009                 for (j=0; j<ips->num; j++) {
3010                         if (ips->ips[j].pnn == -1 &&
3011                             nodemap->nodes[pnn].flags == 0) {
3012                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3013                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3014                                 need_takeover_run = true;
3015                         }
3016                 }
3017
3018                 talloc_free(ips);
3019
3020                 /* read the *known* IPs from the local node */
3021                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3022                 if (ret != 0) {
3023                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3024                         talloc_free(mem_ctx);
3025                         return -1;
3026                 }
3027
3028                 for (j=0; j<ips->num; j++) {
3029                         if (ips->ips[j].pnn == pnn) {
3030                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3031                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3032                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3033                                         need_takeover_run = true;
3034                                 }
3035                         } else {
3036                                 if (ctdb->do_checkpublicip &&
3037                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3038
3039                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3040                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3041
3042                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3043                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3044                                         }
3045                                 }
3046                         }
3047                 }
3048         }
3049
3050         if (need_takeover_run) {
3051                 struct takeover_run_reply rd;
3052                 TDB_DATA data;
3053
3054                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3055
3056                 rd.pnn = ctdb->pnn;
3057                 rd.srvid = 0;
3058                 data.dptr = (uint8_t *)&rd;
3059                 data.dsize = sizeof(rd);
3060
3061                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3062                 if (ret != 0) {
3063                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3064                 }
3065         }
3066         talloc_free(mem_ctx);
3067         return 0;
3068 }
3069
3070
3071 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3072 {
3073         struct ctdb_node_map **remote_nodemaps = callback_data;
3074
3075         if (node_pnn >= ctdb->num_nodes) {
3076                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3077                 return;
3078         }
3079
3080         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3081
3082 }
3083
3084 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3085         struct ctdb_node_map *nodemap,
3086         struct ctdb_node_map **remote_nodemaps)
3087 {
3088         uint32_t *nodes;
3089
3090         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3091         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3092                                         nodes, 0,
3093                                         CONTROL_TIMEOUT(), false, tdb_null,
3094                                         async_getnodemap_callback,
3095                                         NULL,
3096                                         remote_nodemaps) != 0) {
3097                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3098
3099                 return -1;
3100         }
3101
3102         return 0;
3103 }
3104
3105 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3106 struct ctdb_check_reclock_state {
3107         struct ctdb_context *ctdb;
3108         struct timeval start_time;
3109         int fd[2];
3110         pid_t child;
3111         struct timed_event *te;
3112         struct fd_event *fde;
3113         enum reclock_child_status status;
3114 };
3115
3116 /* when we free the reclock state we must kill any child process.
3117 */
3118 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3119 {
3120         struct ctdb_context *ctdb = state->ctdb;
3121
3122         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3123
3124         if (state->fd[0] != -1) {
3125                 close(state->fd[0]);
3126                 state->fd[0] = -1;
3127         }
3128         if (state->fd[1] != -1) {
3129                 close(state->fd[1]);
3130                 state->fd[1] = -1;
3131         }
3132         ctdb_kill(ctdb, state->child, SIGKILL);
3133         return 0;
3134 }
3135
3136 /*
3137   called if our check_reclock child times out. this would happen if
3138   i/o to the reclock file blocks.
3139  */
3140 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3141                                          struct timeval t, void *private_data)
3142 {
3143         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3144                                            struct ctdb_check_reclock_state);
3145
3146         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3147         state->status = RECLOCK_TIMEOUT;
3148 }
3149
3150 /* this is called when the child process has completed checking the reclock
3151    file and has written data back to us through the pipe.
3152 */
3153 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3154                              uint16_t flags, void *private_data)
3155 {
3156         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3157                                              struct ctdb_check_reclock_state);
3158         char c = 0;
3159         int ret;
3160
3161         /* we got a response from our child process so we can abort the
3162            timeout.
3163         */
3164         talloc_free(state->te);
3165         state->te = NULL;
3166
3167         ret = read(state->fd[0], &c, 1);
3168         if (ret != 1 || c != RECLOCK_OK) {
3169                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3170                 state->status = RECLOCK_FAILED;
3171
3172                 return;
3173         }
3174
3175         state->status = RECLOCK_OK;
3176         return;
3177 }
3178
3179 static int check_recovery_lock(struct ctdb_context *ctdb)
3180 {
3181         int ret;
3182         struct ctdb_check_reclock_state *state;
3183         pid_t parent = getpid();
3184
3185         if (ctdb->recovery_lock_fd == -1) {
3186                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3187                 return -1;
3188         }
3189
3190         state = talloc(ctdb, struct ctdb_check_reclock_state);
3191         CTDB_NO_MEMORY(ctdb, state);
3192
3193         state->ctdb = ctdb;
3194         state->start_time = timeval_current();
3195         state->status = RECLOCK_CHECKING;
3196         state->fd[0] = -1;
3197         state->fd[1] = -1;
3198
3199         ret = pipe(state->fd);
3200         if (ret != 0) {
3201                 talloc_free(state);
3202                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3203                 return -1;
3204         }
3205
3206         state->child = ctdb_fork(ctdb);
3207         if (state->child == (pid_t)-1) {
3208                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3209                 close(state->fd[0]);
3210                 state->fd[0] = -1;
3211                 close(state->fd[1]);
3212                 state->fd[1] = -1;
3213                 talloc_free(state);
3214                 return -1;
3215         }
3216
3217         if (state->child == 0) {
3218                 char cc = RECLOCK_OK;
3219                 close(state->fd[0]);
3220                 state->fd[0] = -1;
3221
3222                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3223                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3224                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3225                         cc = RECLOCK_FAILED;
3226                 }
3227
3228                 write(state->fd[1], &cc, 1);
3229                 /* make sure we die when our parent dies */
3230                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3231                         sleep(5);
3232                 }
3233                 _exit(0);
3234         }
3235         close(state->fd[1]);
3236         state->fd[1] = -1;
3237         set_close_on_exec(state->fd[0]);
3238
3239         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3240
3241         talloc_set_destructor(state, check_reclock_destructor);
3242
3243         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3244                                     ctdb_check_reclock_timeout, state);
3245         if (state->te == NULL) {
3246                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3247                 talloc_free(state);
3248                 return -1;
3249         }
3250
3251         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3252                                 EVENT_FD_READ,
3253                                 reclock_child_handler,
3254                                 (void *)state);
3255
3256         if (state->fde == NULL) {
3257                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3258                 talloc_free(state);
3259                 return -1;
3260         }
3261         tevent_fd_set_auto_close(state->fde);
3262
3263         while (state->status == RECLOCK_CHECKING) {
3264                 event_loop_once(ctdb->ev);
3265         }
3266
3267         if (state->status == RECLOCK_FAILED) {
3268                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3269                 close(ctdb->recovery_lock_fd);
3270                 ctdb->recovery_lock_fd = -1;
3271                 talloc_free(state);
3272                 return -1;
3273         }
3274
3275         talloc_free(state);
3276         return 0;
3277 }
3278
3279 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3280 {
3281         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3282         const char *reclockfile;
3283
3284         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3285                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3286                 talloc_free(tmp_ctx);
3287                 return -1;      
3288         }
3289
3290         if (reclockfile == NULL) {
3291                 if (ctdb->recovery_lock_file != NULL) {
3292                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3293                         talloc_free(ctdb->recovery_lock_file);
3294                         ctdb->recovery_lock_file = NULL;
3295                         if (ctdb->recovery_lock_fd != -1) {
3296                                 close(ctdb->recovery_lock_fd);
3297                                 ctdb->recovery_lock_fd = -1;
3298                         }
3299                 }
3300                 ctdb->tunable.verify_recovery_lock = 0;
3301                 talloc_free(tmp_ctx);
3302                 return 0;
3303         }
3304
3305         if (ctdb->recovery_lock_file == NULL) {
3306                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3307                 if (ctdb->recovery_lock_fd != -1) {
3308                         close(ctdb->recovery_lock_fd);
3309                         ctdb->recovery_lock_fd = -1;
3310                 }
3311                 talloc_free(tmp_ctx);
3312                 return 0;
3313         }
3314
3315
3316         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3317                 talloc_free(tmp_ctx);
3318                 return 0;
3319         }
3320
3321         talloc_free(ctdb->recovery_lock_file);
3322         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3323         ctdb->tunable.verify_recovery_lock = 0;
3324         if (ctdb->recovery_lock_fd != -1) {
3325                 close(ctdb->recovery_lock_fd);
3326                 ctdb->recovery_lock_fd = -1;
3327         }
3328
3329         talloc_free(tmp_ctx);
3330         return 0;
3331 }
3332
3333 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3334                       TALLOC_CTX *mem_ctx)
3335 {
3336         uint32_t pnn;
3337         struct ctdb_node_map *nodemap=NULL;
3338         struct ctdb_node_map *recmaster_nodemap=NULL;
3339         struct ctdb_node_map **remote_nodemaps=NULL;
3340         struct ctdb_vnn_map *vnnmap=NULL;
3341         struct ctdb_vnn_map *remote_vnnmap=NULL;
3342         int32_t debug_level;
3343         int i, j, ret;
3344         bool self_ban;
3345
3346
3347         /* verify that the main daemon is still running */
3348         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3349                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3350                 exit(-1);
3351         }
3352
3353         /* ping the local daemon to tell it we are alive */
3354         ctdb_ctrl_recd_ping(ctdb);
3355
3356         if (rec->election_timeout) {
3357                 /* an election is in progress */
3358                 return;
3359         }
3360
3361         /* read the debug level from the parent and update locally */
3362         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3363         if (ret !=0) {
3364                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3365                 return;
3366         }
3367         LogLevel = debug_level;
3368
3369         /* get relevant tunables */
3370         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3371         if (ret != 0) {
3372                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3373                 return;
3374         }
3375
3376         /* get the current recovery lock file from the server */
3377         if (update_recovery_lock_file(ctdb) != 0) {
3378                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3379                 return;
3380         }
3381
3382         /* Make sure that if recovery lock verification becomes disabled when
3383            we close the file
3384         */
3385         if (ctdb->tunable.verify_recovery_lock == 0) {
3386                 if (ctdb->recovery_lock_fd != -1) {
3387                         close(ctdb->recovery_lock_fd);
3388                         ctdb->recovery_lock_fd = -1;
3389                 }
3390         }
3391
3392         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3393         if (pnn == (uint32_t)-1) {
3394                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3395                 return;
3396         }
3397
3398         /* get the vnnmap */
3399         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3400         if (ret != 0) {
3401                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3402                 return;
3403         }
3404
3405
3406         /* get number of nodes */
3407         if (rec->nodemap) {
3408                 talloc_free(rec->nodemap);
3409                 rec->nodemap = NULL;
3410                 nodemap=NULL;
3411         }
3412         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3413         if (ret != 0) {
3414                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3415                 return;
3416         }
3417         nodemap = rec->nodemap;
3418
3419         /* remember our own node flags */
3420         rec->node_flags = nodemap->nodes[pnn].flags;
3421
3422         ban_misbehaving_nodes(rec, &self_ban);
3423         if (self_ban) {
3424                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3425                 return;
3426         }
3427
3428         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3429            also frozen and that the recmode is set to active.
3430         */
3431         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3432                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3433                 if (ret != 0) {
3434                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3435                 }
3436                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3437                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3438
3439                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3440                         if (ret != 0) {
3441                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3442                                 return;
3443                         }
3444                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3445                         if (ret != 0) {
3446                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3447
3448                                 return;
3449                         }
3450                 }
3451
3452                 /* If this node is stopped or banned then it is not the recovery
3453                  * master, so don't do anything. This prevents stopped or banned
3454                  * node from starting election and sending unnecessary controls.
3455                  */
3456                 return;
3457         }
3458
3459         /* check which node is the recovery master */
3460         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3461         if (ret != 0) {
3462                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3463                 return;
3464         }
3465
3466         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3467         if (rec->recmaster != pnn) {
3468                 if (rec->ip_reallocate_ctx != NULL) {
3469                         talloc_free(rec->ip_reallocate_ctx);
3470                         rec->ip_reallocate_ctx = NULL;
3471                         rec->reallocate_callers = NULL;
3472                 }
3473         }
3474
3475         /* This is a special case.  When recovery daemon is started, recmaster
3476          * is set to -1.  If a node is not started in stopped state, then
3477          * start election to decide recovery master
3478          */
3479         if (rec->recmaster == (uint32_t)-1) {
3480                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3481                 force_election(rec, pnn, nodemap);
3482                 return;
3483         }
3484
3485         /* update the capabilities for all nodes */
3486         ret = update_capabilities(ctdb, nodemap);
3487         if (ret != 0) {
3488                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3489                 return;
3490         }
3491
3492         /*
3493          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3494          * but we have, then force an election and try to become the new
3495          * recmaster.
3496          */
3497         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3498             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3499              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3500                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3501                                   " but we (node %u) have - force an election\n",
3502                                   rec->recmaster, pnn));
3503                 force_election(rec, pnn, nodemap);
3504                 return;
3505         }
3506
3507         /* count how many active nodes there are */
3508         rec->num_active    = 0;
3509         rec->num_connected = 0;
3510         for (i=0; i<nodemap->num; i++) {
3511                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3512                         rec->num_active++;
3513                 }
3514                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3515                         rec->num_connected++;
3516                 }
3517         }
3518
3519
3520         /* verify that the recmaster node is still active */
3521         for (j=0; j<nodemap->num; j++) {
3522                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3523                         break;
3524                 }
3525         }
3526
3527         if (j == nodemap->num) {
3528                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3529                 force_election(rec, pnn, nodemap);
3530                 return;
3531         }
3532
3533         /* if recovery master is disconnected we must elect a new recmaster */
3534         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3535                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3536                 force_election(rec, pnn, nodemap);
3537                 return;
3538         }
3539
3540         /* get nodemap from the recovery master to check if it is inactive */
3541         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3542                                    mem_ctx, &recmaster_nodemap);
3543         if (ret != 0) {
3544                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3545                           nodemap->nodes[j].pnn));
3546                 return;
3547         }
3548
3549
3550         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3551             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3552                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3553                 /*
3554                  * update our nodemap to carry the recmaster's notion of
3555                  * its own flags, so that we don't keep freezing the
3556                  * inactive recmaster node...
3557                  */
3558                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3559                 force_election(rec, pnn, nodemap);
3560                 return;
3561         }
3562
3563         /* verify that we have all ip addresses we should have and we dont
3564          * have addresses we shouldnt have.
3565          */ 
3566         if (ctdb->tunable.disable_ip_failover == 0) {
3567                 if (rec->ip_check_disable_ctx == NULL) {
3568                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3569                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3570                         }
3571                 }
3572         }
3573
3574
3575         /* if we are not the recmaster then we do not need to check
3576            if recovery is needed
3577          */
3578         if (pnn != rec->recmaster) {
3579                 return;
3580         }
3581
3582
3583         /* ensure our local copies of flags are right */
3584         ret = update_local_flags(rec, nodemap);
3585         if (ret == MONITOR_ELECTION_NEEDED) {
3586                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3587                 force_election(rec, pnn, nodemap);
3588                 return;
3589         }
3590         if (ret != MONITOR_OK) {
3591                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3592                 return;
3593         }
3594
3595         if (ctdb->num_nodes != nodemap->num) {
3596                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3597                 reload_nodes_file(ctdb);
3598                 return;
3599         }
3600
3601         /* verify that all active nodes agree that we are the recmaster */
3602         switch (verify_recmaster(rec, nodemap, pnn)) {
3603         case MONITOR_RECOVERY_NEEDED:
3604                 /* can not happen */
3605                 return;
3606         case MONITOR_ELECTION_NEEDED:
3607                 force_election(rec, pnn, nodemap);
3608                 return;
3609         case MONITOR_OK:
3610                 break;
3611         case MONITOR_FAILED:
3612                 return;
3613         }
3614
3615
3616         if (rec->need_recovery) {
3617                 /* a previous recovery didn't finish */
3618                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3619                 return;
3620         }
3621
3622         /* verify that all active nodes are in normal mode 
3623            and not in recovery mode 
3624         */
3625         switch (verify_recmode(ctdb, nodemap)) {
3626         case MONITOR_RECOVERY_NEEDED:
3627                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3628                 return;
3629         case MONITOR_FAILED:
3630                 return;
3631         case MONITOR_ELECTION_NEEDED:
3632                 /* can not happen */
3633         case MONITOR_OK:
3634                 break;
3635         }
3636
3637
3638         if (ctdb->tunable.verify_recovery_lock != 0) {
3639                 /* we should have the reclock - check its not stale */
3640                 ret = check_recovery_lock(ctdb);
3641                 if (ret != 0) {
3642                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3643                         ctdb_set_culprit(rec, ctdb->pnn);
3644                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3645                         return;
3646                 }
3647         }
3648
3649
3650         /* is there a pending reload all ips ? */
3651         if (reload_all_ips_request != NULL) {
3652                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3653                 talloc_free(reload_all_ips_request);
3654                 reload_all_ips_request = NULL;
3655         }
3656
3657         /* if there are takeovers requested, perform it and notify the waiters */
3658         if (rec->reallocate_callers) {
3659                 process_ipreallocate_requests(ctdb, rec);
3660         }
3661
3662         /* get the nodemap for all active remote nodes
3663          */
3664         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3665         if (remote_nodemaps == NULL) {
3666                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3667                 return;
3668         }
3669         for(i=0; i<nodemap->num; i++) {
3670                 remote_nodemaps[i] = NULL;
3671         }
3672         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3673                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3674                 return;
3675         } 
3676
3677         /* verify that all other nodes have the same nodemap as we have
3678         */
3679         for (j=0; j<nodemap->num; j++) {
3680                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3681                         continue;
3682                 }
3683
3684                 if (remote_nodemaps[j] == NULL) {
3685                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3686                         ctdb_set_culprit(rec, j);
3687
3688                         return;
3689                 }
3690
3691                 /* if the nodes disagree on how many nodes there are
3692                    then this is a good reason to try recovery
3693                  */
3694                 if (remote_nodemaps[j]->num != nodemap->num) {
3695                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3696                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3697                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3698                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3699                         return;
3700                 }
3701
3702                 /* if the nodes disagree on which nodes exist and are
3703                    active, then that is also a good reason to do recovery
3704                  */
3705                 for (i=0;i<nodemap->num;i++) {
3706                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3707                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3708                                           nodemap->nodes[j].pnn, i, 
3709                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3710                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3711                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3712                                             vnnmap);
3713                                 return;
3714                         }
3715                 }
3716
3717                 /* verify the flags are consistent
3718                 */
3719                 for (i=0; i<nodemap->num; i++) {
3720                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3721                                 continue;
3722                         }
3723                         
3724                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3725                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3726                                   nodemap->nodes[j].pnn, 
3727                                   nodemap->nodes[i].pnn, 
3728                                   remote_nodemaps[j]->nodes[i].flags,
3729                                   nodemap->nodes[i].flags));
3730                                 if (i == j) {
3731                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3732                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3733                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3734                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3735                                                     vnnmap);
3736                                         return;
3737                                 } else {
3738                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3739                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3740                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3741                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3742                                                     vnnmap);
3743                                         return;
3744                                 }
3745                         }
3746                 }
3747         }
3748
3749
3750         /* there better be the same number of lmasters in the vnn map
3751            as there are active nodes or we will have to do a recovery
3752          */
3753         if (vnnmap->size != rec->num_active) {
3754                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3755                           vnnmap->size, rec->num_active));
3756                 ctdb_set_culprit(rec, ctdb->pnn);
3757                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3758                 return;
3759         }
3760
3761         /* verify that all active nodes in the nodemap also exist in 
3762            the vnnmap.
3763          */
3764         for (j=0; j<nodemap->num; j++) {
3765                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3766                         continue;
3767                 }
3768                 if (nodemap->nodes[j].pnn == pnn) {
3769                         continue;
3770                 }
3771
3772                 for (i=0; i<vnnmap->size; i++) {
3773                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3774                                 break;
3775                         }
3776                 }
3777                 if (i == vnnmap->size) {
3778                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3779                                   nodemap->nodes[j].pnn));
3780                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3781                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3782                         return;
3783                 }
3784         }
3785
3786         
3787         /* verify that all other nodes have the same vnnmap
3788            and are from the same generation
3789          */
3790         for (j=0; j<nodemap->num; j++) {
3791                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3792                         continue;
3793                 }
3794                 if (nodemap->nodes[j].pnn == pnn) {
3795                         continue;
3796                 }
3797
3798                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3799                                           mem_ctx, &remote_vnnmap);
3800                 if (ret != 0) {
3801                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3802                                   nodemap->nodes[j].pnn));
3803                         return;
3804                 }
3805
3806                 /* verify the vnnmap generation is the same */
3807                 if (vnnmap->generation != remote_vnnmap->generation) {
3808                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3809                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3810                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3811                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3812                         return;
3813                 }
3814
3815                 /* verify the vnnmap size is the same */
3816                 if (vnnmap->size != remote_vnnmap->size) {
3817                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3818                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3819                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3820                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3821                         return;
3822                 }
3823
3824                 /* verify the vnnmap is the same */
3825                 for (i=0;i<vnnmap->size;i++) {
3826                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3827                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3828                                           nodemap->nodes[j].pnn));
3829                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3830                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3831                                             vnnmap);
3832                                 return;
3833                         }
3834                 }
3835         }
3836
3837         /* we might need to change who has what IP assigned */
3838         if (rec->need_takeover_run) {
3839                 uint32_t culprit = (uint32_t)-1;
3840
3841                 rec->need_takeover_run = false;
3842
3843                 /* update the list of public ips that a node can handle for
3844                    all connected nodes
3845                 */
3846                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3847                 if (ret != 0) {
3848                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3849                                          culprit));
3850                         rec->need_takeover_run = true;
3851                         return;
3852                 }
3853
3854                 /* execute the "startrecovery" event script on all nodes */
3855                 ret = run_startrecovery_eventscript(rec, nodemap);
3856                 if (ret!=0) {
3857                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3858                         ctdb_set_culprit(rec, ctdb->pnn);
3859                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3860                         return;
3861                 }
3862
3863                 /* If takeover run fails, then the offending nodes are
3864                  * assigned ban culprit counts. And we re-try takeover.
3865                  * If takeover run fails repeatedly, the node would get
3866                  * banned.
3867                  *
3868                  * If rec->need_takeover_run is not set to true at this
3869                  * failure, monitoring is disabled cluster-wide (via
3870                  * startrecovery eventscript) and will not get enabled.
3871                  */
3872                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3873                 if (ret != 0) {
3874                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3875                         return;
3876                 }
3877
3878                 /* execute the "recovered" event script on all nodes */
3879                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3880 #if 0
3881 // we cant check whether the event completed successfully
3882 // since this script WILL fail if the node is in recovery mode
3883 // and if that race happens, the code here would just cause a second
3884 // cascading recovery.
3885                 if (ret!=0) {
3886                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3887                         ctdb_set_culprit(rec, ctdb->pnn);
3888                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3889                 }
3890 #endif
3891         }
3892 }
3893
3894 /*
3895   the main monitoring loop
3896  */
3897 static void monitor_cluster(struct ctdb_context *ctdb)
3898 {
3899         struct ctdb_recoverd *rec;
3900
3901         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3902
3903         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3904         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3905
3906         rec->ctdb = ctdb;
3907
3908         rec->priority_time = timeval_current();
3909
3910         /* register a message port for sending memory dumps */
3911         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3912
3913         /* register a message port for requesting logs */
3914         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3915
3916         /* register a message port for clearing logs */
3917         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3918
3919         /* register a message port for recovery elections */
3920         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3921
3922         /* when nodes are disabled/enabled */
3923         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3924
3925         /* when we are asked to puch out a flag change */
3926         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3927
3928         /* register a message port for vacuum fetch */
3929         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3930
3931         /* register a message port for reloadnodes  */
3932         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3933
3934         /* register a message port for performing a takeover run */
3935         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3936
3937         /* register a message port for performing a reload all ips */
3938         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3939
3940         /* register a message port for disabling the ip check for a short while */
3941         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3942
3943         /* register a message port for updating the recovery daemons node assignment for an ip */
3944         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3945
3946         /* register a message port for forcing a rebalance of a node next
3947            reallocation */
3948         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3949
3950         for (;;) {
3951                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3952                 struct timeval start;
3953                 double elapsed;
3954
3955                 if (!mem_ctx) {
3956                         DEBUG(DEBUG_CRIT,(__location__
3957                                           " Failed to create temp context\n"));
3958                         exit(-1);
3959                 }
3960
3961                 start = timeval_current();
3962                 main_loop(ctdb, rec, mem_ctx);
3963                 talloc_free(mem_ctx);
3964
3965                 /* we only check for recovery once every second */
3966                 elapsed = timeval_elapsed(&start);
3967                 if (elapsed < ctdb->tunable.recover_interval) {
3968                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3969                                           - elapsed);
3970                 }
3971         }
3972 }
3973
3974 /*
3975   event handler for when the main ctdbd dies
3976  */
3977 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3978                                  uint16_t flags, void *private_data)
3979 {
3980         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3981         _exit(1);
3982 }
3983
3984 /*
3985   called regularly to verify that the recovery daemon is still running
3986  */
3987 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3988                               struct timeval yt, void *p)
3989 {
3990         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3991
3992         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3993                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3994
3995                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3996                                 ctdb_restart_recd, ctdb);
3997
3998                 return;
3999         }
4000
4001         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4002                         timeval_current_ofs(30, 0),
4003                         ctdb_check_recd, ctdb);
4004 }
4005
4006 static void recd_sig_child_handler(struct event_context *ev,
4007         struct signal_event *se, int signum, int count,
4008         void *dont_care, 
4009         void *private_data)
4010 {
4011 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4012         int status;
4013         pid_t pid = -1;
4014
4015         while (pid != 0) {
4016                 pid = waitpid(-1, &status, WNOHANG);
4017                 if (pid == -1) {
4018                         if (errno != ECHILD) {
4019                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4020                         }
4021                         return;
4022                 }
4023                 if (pid > 0) {
4024                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4025                 }
4026         }
4027 }
4028
4029 /*
4030   startup the recovery daemon as a child of the main ctdb daemon
4031  */
4032 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4033 {
4034         int fd[2];
4035         struct signal_event *se;
4036         struct tevent_fd *fde;
4037
4038         if (pipe(fd) != 0) {
4039                 return -1;
4040         }
4041
4042         ctdb->ctdbd_pid = getpid();
4043
4044         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4045         if (ctdb->recoverd_pid == -1) {
4046                 return -1;
4047         }
4048
4049         if (ctdb->recoverd_pid != 0) {
4050                 talloc_free(ctdb->recd_ctx);
4051                 ctdb->recd_ctx = talloc_new(ctdb);
4052                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4053
4054                 close(fd[0]);
4055                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4056                                 timeval_current_ofs(30, 0),
4057                                 ctdb_check_recd, ctdb);
4058                 return 0;
4059         }
4060
4061         close(fd[1]);
4062
4063         srandom(getpid() ^ time(NULL));
4064
4065         /* Clear the log ringbuffer */
4066         ctdb_clear_log(ctdb);
4067
4068         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4069                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4070                 exit(1);
4071         }
4072
4073         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4074
4075         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4076                      ctdb_recoverd_parent, &fd[0]);
4077         tevent_fd_set_auto_close(fde);
4078
4079         /* set up a handler to pick up sigchld */
4080         se = event_add_signal(ctdb->ev, ctdb,
4081                                      SIGCHLD, 0,
4082                                      recd_sig_child_handler,
4083                                      ctdb);
4084         if (se == NULL) {
4085                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4086                 exit(1);
4087         }
4088
4089         monitor_cluster(ctdb);
4090
4091         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4092         return -1;
4093 }
4094
4095 /*
4096   shutdown the recovery daemon
4097  */
4098 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4099 {
4100         if (ctdb->recoverd_pid == 0) {
4101                 return;
4102         }
4103
4104         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4105         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4106
4107         TALLOC_FREE(ctdb->recd_ctx);
4108         TALLOC_FREE(ctdb->recd_ping_count);
4109 }
4110
4111 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4112                        struct timeval t, void *private_data)
4113 {
4114         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4115
4116         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4117         ctdb_stop_recoverd(ctdb);
4118         ctdb_start_recoverd(ctdb);
4119 }