recoverd: Remove an unused temporary talloc context
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         if (!ctdb_validate_pnn(ctdb, pnn)) {
90                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
91                 return;
92         }
93
94         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         /* If we are banned or stopped, do not set other nodes as culprits */
124         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
125                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
126                 return;
127         }
128
129         if (ctdb->nodes[culprit]->ban_state == NULL) {
130                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
131                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
132
133                 
134         }
135         ban_state = ctdb->nodes[culprit]->ban_state;
136         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
137                 /* this was the first time in a long while this node
138                    misbehaved so we will forgive any old transgressions.
139                 */
140                 ban_state->count = 0;
141         }
142
143         ban_state->count += count;
144         ban_state->last_reported_time = timeval_current();
145         rec->last_culprit_node = culprit;
146 }
147
148 /*
149   remember the trouble maker
150  */
151 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
152 {
153         ctdb_set_culprit_count(rec, culprit, 1);
154 }
155
156
157 /* this callback is called for every node that failed to execute the
158    recovered event
159 */
160 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
161 {
162         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
163
164         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
165
166         ctdb_set_culprit(rec, node_pnn);
167 }
168
169 /*
170   run the "recovered" eventscript on all nodes
171  */
172 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
173 {
174         TALLOC_CTX *tmp_ctx;
175         uint32_t *nodes;
176         struct ctdb_context *ctdb = rec->ctdb;
177
178         tmp_ctx = talloc_new(ctdb);
179         CTDB_NO_MEMORY(ctdb, tmp_ctx);
180
181         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
182         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
183                                         nodes, 0,
184                                         CONTROL_TIMEOUT(), false, tdb_null,
185                                         NULL, recovered_fail_callback,
186                                         rec) != 0) {
187                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
188
189                 talloc_free(tmp_ctx);
190                 return -1;
191         }
192
193         talloc_free(tmp_ctx);
194         return 0;
195 }
196
197 /* this callback is called for every node that failed to execute the
198    start recovery event
199 */
200 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
201 {
202         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
203
204         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
205
206         ctdb_set_culprit(rec, node_pnn);
207 }
208
209 /*
210   run the "startrecovery" eventscript on all nodes
211  */
212 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
213 {
214         TALLOC_CTX *tmp_ctx;
215         uint32_t *nodes;
216         struct ctdb_context *ctdb = rec->ctdb;
217
218         tmp_ctx = talloc_new(ctdb);
219         CTDB_NO_MEMORY(ctdb, tmp_ctx);
220
221         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
222         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
223                                         nodes, 0,
224                                         CONTROL_TIMEOUT(), false, tdb_null,
225                                         NULL,
226                                         startrecovery_fail_callback,
227                                         rec) != 0) {
228                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
229                 talloc_free(tmp_ctx);
230                 return -1;
231         }
232
233         talloc_free(tmp_ctx);
234         return 0;
235 }
236
237 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
238 {
239         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
240                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
241                 return;
242         }
243         if (node_pnn < ctdb->num_nodes) {
244                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
245         }
246
247         if (node_pnn == ctdb->pnn) {
248                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
249         }
250 }
251
252 /*
253   update the node capabilities for all connected nodes
254  */
255 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
256 {
257         uint32_t *nodes;
258         TALLOC_CTX *tmp_ctx;
259
260         tmp_ctx = talloc_new(ctdb);
261         CTDB_NO_MEMORY(ctdb, tmp_ctx);
262
263         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
264         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
265                                         nodes, 0,
266                                         CONTROL_TIMEOUT(),
267                                         false, tdb_null,
268                                         async_getcap_callback, NULL,
269                                         NULL) != 0) {
270                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
271                 talloc_free(tmp_ctx);
272                 return -1;
273         }
274
275         talloc_free(tmp_ctx);
276         return 0;
277 }
278
279 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
280 {
281         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
282
283         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
284         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
285 }
286
287 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
288 {
289         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
290
291         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
292         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
293 }
294
295 /*
296   change recovery mode on all nodes
297  */
298 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
299 {
300         TDB_DATA data;
301         uint32_t *nodes;
302         TALLOC_CTX *tmp_ctx;
303
304         tmp_ctx = talloc_new(ctdb);
305         CTDB_NO_MEMORY(ctdb, tmp_ctx);
306
307         /* freeze all nodes */
308         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
309         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
310                 int i;
311
312                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
313                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
314                                                 nodes, i,
315                                                 CONTROL_TIMEOUT(),
316                                                 false, tdb_null,
317                                                 NULL,
318                                                 set_recmode_fail_callback,
319                                                 rec) != 0) {
320                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
321                                 talloc_free(tmp_ctx);
322                                 return -1;
323                         }
324                 }
325         }
326
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&rec_mode;
330
331         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
332                                         nodes, 0,
333                                         CONTROL_TIMEOUT(),
334                                         false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /*
347   change recovery master on all node
348  */
349 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
350 {
351         TDB_DATA data;
352         TALLOC_CTX *tmp_ctx;
353         uint32_t *nodes;
354
355         tmp_ctx = talloc_new(ctdb);
356         CTDB_NO_MEMORY(ctdb, tmp_ctx);
357
358         data.dsize = sizeof(uint32_t);
359         data.dptr = (unsigned char *)&pnn;
360
361         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
362         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
363                                         nodes, 0,
364                                         CONTROL_TIMEOUT(), false, data,
365                                         NULL, NULL,
366                                         NULL) != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
368                 talloc_free(tmp_ctx);
369                 return -1;
370         }
371
372         talloc_free(tmp_ctx);
373         return 0;
374 }
375
376 /* update all remote nodes to use the same db priority that we have
377    this can fail if the remove node has not yet been upgraded to 
378    support this function, so we always return success and never fail
379    a recovery if this call fails.
380 */
381 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
382         struct ctdb_node_map *nodemap, 
383         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
384 {
385         int db;
386         uint32_t *nodes;
387
388         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
389
390         /* step through all local databases */
391         for (db=0; db<dbmap->num;db++) {
392                 TDB_DATA data;
393                 struct ctdb_db_priority db_prio;
394                 int ret;
395
396                 db_prio.db_id     = dbmap->dbs[db].dbid;
397                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
398                 if (ret != 0) {
399                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
400                         continue;
401                 }
402
403                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
404
405                 data.dptr  = (uint8_t *)&db_prio;
406                 data.dsize = sizeof(db_prio);
407
408                 if (ctdb_client_async_control(ctdb,
409                                         CTDB_CONTROL_SET_DB_PRIORITY,
410                                         nodes, 0,
411                                         CONTROL_TIMEOUT(), false, data,
412                                         NULL, NULL,
413                                         NULL) != 0) {
414                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
415                 }
416         }
417
418         return 0;
419 }                       
420
421 /*
422   ensure all other nodes have attached to any databases that we have
423  */
424 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
425                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
426 {
427         int i, j, db, ret;
428         struct ctdb_dbid_map *remote_dbmap;
429
430         /* verify that all other nodes have all our databases */
431         for (j=0; j<nodemap->num; j++) {
432                 /* we dont need to ourself ourselves */
433                 if (nodemap->nodes[j].pnn == pnn) {
434                         continue;
435                 }
436                 /* dont check nodes that are unavailable */
437                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
438                         continue;
439                 }
440
441                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
442                                          mem_ctx, &remote_dbmap);
443                 if (ret != 0) {
444                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
445                         return -1;
446                 }
447
448                 /* step through all local databases */
449                 for (db=0; db<dbmap->num;db++) {
450                         const char *name;
451
452
453                         for (i=0;i<remote_dbmap->num;i++) {
454                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
455                                         break;
456                                 }
457                         }
458                         /* the remote node already have this database */
459                         if (i!=remote_dbmap->num) {
460                                 continue;
461                         }
462                         /* ok so we need to create this database */
463                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
464                                             mem_ctx, &name);
465                         if (ret != 0) {
466                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
467                                 return -1;
468                         }
469                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
470                                            mem_ctx, name,
471                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
472                         if (ret != 0) {
473                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
474                                 return -1;
475                         }
476                 }
477         }
478
479         return 0;
480 }
481
482
483 /*
484   ensure we are attached to any databases that anyone else is attached to
485  */
486 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
487                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
488 {
489         int i, j, db, ret;
490         struct ctdb_dbid_map *remote_dbmap;
491
492         /* verify that we have all database any other node has */
493         for (j=0; j<nodemap->num; j++) {
494                 /* we dont need to ourself ourselves */
495                 if (nodemap->nodes[j].pnn == pnn) {
496                         continue;
497                 }
498                 /* dont check nodes that are unavailable */
499                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
500                         continue;
501                 }
502
503                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
504                                          mem_ctx, &remote_dbmap);
505                 if (ret != 0) {
506                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
507                         return -1;
508                 }
509
510                 /* step through all databases on the remote node */
511                 for (db=0; db<remote_dbmap->num;db++) {
512                         const char *name;
513
514                         for (i=0;i<(*dbmap)->num;i++) {
515                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
516                                         break;
517                                 }
518                         }
519                         /* we already have this db locally */
520                         if (i!=(*dbmap)->num) {
521                                 continue;
522                         }
523                         /* ok so we need to create this database and
524                            rebuild dbmap
525                          */
526                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
527                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
528                         if (ret != 0) {
529                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
530                                           nodemap->nodes[j].pnn));
531                                 return -1;
532                         }
533                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
534                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
535                         if (ret != 0) {
536                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
537                                 return -1;
538                         }
539                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   pull the remote database contents from one node into the recdb
553  */
554 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
555                                     struct tdb_wrap *recdb, uint32_t dbid)
556 {
557         int ret;
558         TDB_DATA outdata;
559         struct ctdb_marshall_buffer *reply;
560         struct ctdb_rec_data *rec;
561         int i;
562         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
563
564         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
565                                CONTROL_TIMEOUT(), &outdata);
566         if (ret != 0) {
567                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
568                 talloc_free(tmp_ctx);
569                 return -1;
570         }
571
572         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
573
574         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
575                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
576                 talloc_free(tmp_ctx);
577                 return -1;
578         }
579         
580         rec = (struct ctdb_rec_data *)&reply->data[0];
581         
582         for (i=0;
583              i<reply->count;
584              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
585                 TDB_DATA key, data;
586                 struct ctdb_ltdb_header *hdr;
587                 TDB_DATA existing;
588                 
589                 key.dptr = &rec->data[0];
590                 key.dsize = rec->keylen;
591                 data.dptr = &rec->data[key.dsize];
592                 data.dsize = rec->datalen;
593                 
594                 hdr = (struct ctdb_ltdb_header *)data.dptr;
595
596                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
597                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
598                         talloc_free(tmp_ctx);
599                         return -1;
600                 }
601
602                 /* fetch the existing record, if any */
603                 existing = tdb_fetch(recdb->tdb, key);
604                 
605                 if (existing.dptr != NULL) {
606                         struct ctdb_ltdb_header header;
607                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
608                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
609                                          (unsigned)existing.dsize, srcnode));
610                                 free(existing.dptr);
611                                 talloc_free(tmp_ctx);
612                                 return -1;
613                         }
614                         header = *(struct ctdb_ltdb_header *)existing.dptr;
615                         free(existing.dptr);
616                         if (!(header.rsn < hdr->rsn ||
617                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
618                                 continue;
619                         }
620                 }
621                 
622                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
623                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
624                         talloc_free(tmp_ctx);
625                         return -1;                              
626                 }
627         }
628
629         talloc_free(tmp_ctx);
630
631         return 0;
632 }
633
634
635 struct pull_seqnum_cbdata {
636         int failed;
637         uint32_t pnn;
638         uint64_t seqnum;
639 };
640
641 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
642 {
643         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
644         uint64_t seqnum;
645
646         if (cb_data->failed != 0) {
647                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
648                 return;
649         }
650
651         if (res != 0) {
652                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
653                 cb_data->failed = 1;
654                 return;
655         }
656
657         if (outdata.dsize != sizeof(uint64_t)) {
658                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
659                 cb_data->failed = -1;
660                 return;
661         }
662
663         seqnum = *((uint64_t *)outdata.dptr);
664
665         if (seqnum > cb_data->seqnum) {
666                 cb_data->seqnum = seqnum;
667                 cb_data->pnn = node_pnn;
668         }
669 }
670
671 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
672 {
673         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
674
675         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
676         cb_data->failed = 1;
677 }
678
679 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
680                                 struct ctdb_recoverd *rec, 
681                                 struct ctdb_node_map *nodemap, 
682                                 struct tdb_wrap *recdb, uint32_t dbid)
683 {
684         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
685         uint32_t *nodes;
686         TDB_DATA data;
687         uint32_t outdata[2];
688         struct pull_seqnum_cbdata *cb_data;
689
690         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
691
692         outdata[0] = dbid;
693         outdata[1] = 0;
694
695         data.dsize = sizeof(outdata);
696         data.dptr  = (uint8_t *)&outdata[0];
697
698         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
699         if (cb_data == NULL) {
700                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
701                 talloc_free(tmp_ctx);
702                 return -1;
703         }
704
705         cb_data->failed = 0;
706         cb_data->pnn    = -1;
707         cb_data->seqnum = 0;
708         
709         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
710         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
711                                         nodes, 0,
712                                         CONTROL_TIMEOUT(), false, data,
713                                         pull_seqnum_cb,
714                                         pull_seqnum_fail_cb,
715                                         cb_data) != 0) {
716                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
717
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->failed != 0) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
729                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
730                 talloc_free(tmp_ctx);
731                 return -1;
732         }
733
734         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
735
736         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
737                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
738                 talloc_free(tmp_ctx);
739                 return -1;
740         }
741
742         talloc_free(tmp_ctx);
743         return 0;
744 }
745
746
747 /*
748   pull all the remote database contents into the recdb
749  */
750 static int pull_remote_database(struct ctdb_context *ctdb,
751                                 struct ctdb_recoverd *rec, 
752                                 struct ctdb_node_map *nodemap, 
753                                 struct tdb_wrap *recdb, uint32_t dbid,
754                                 bool persistent)
755 {
756         int j;
757
758         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
759                 int ret;
760                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
761                 if (ret == 0) {
762                         return 0;
763                 }
764         }
765
766         /* pull all records from all other nodes across onto this node
767            (this merges based on rsn)
768         */
769         for (j=0; j<nodemap->num; j++) {
770                 /* dont merge from nodes that are unavailable */
771                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
772                         continue;
773                 }
774                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
775                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
776                                  nodemap->nodes[j].pnn));
777                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
778                         return -1;
779                 }
780         }
781         
782         return 0;
783 }
784
785
786 /*
787   update flags on all active nodes
788  */
789 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
790 {
791         int ret;
792
793         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
794                 if (ret != 0) {
795                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
796                 return -1;
797         }
798
799         return 0;
800 }
801
802 /*
803   ensure all nodes have the same vnnmap we do
804  */
805 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
806                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
807 {
808         int j, ret;
809
810         /* push the new vnn map out to all the nodes */
811         for (j=0; j<nodemap->num; j++) {
812                 /* dont push to nodes that are unavailable */
813                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
814                         continue;
815                 }
816
817                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
818                 if (ret != 0) {
819                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
820                         return -1;
821                 }
822         }
823
824         return 0;
825 }
826
827
828 struct vacuum_info {
829         struct vacuum_info *next, *prev;
830         struct ctdb_recoverd *rec;
831         uint32_t srcnode;
832         struct ctdb_db_context *ctdb_db;
833         struct ctdb_marshall_buffer *recs;
834         struct ctdb_rec_data *r;
835 };
836
837 static void vacuum_fetch_next(struct vacuum_info *v);
838
839 /*
840   called when a vacuum fetch has completed - just free it and do the next one
841  */
842 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
843 {
844         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
845         talloc_free(state);
846         vacuum_fetch_next(v);
847 }
848
849
850 /*
851   process the next element from the vacuum list
852 */
853 static void vacuum_fetch_next(struct vacuum_info *v)
854 {
855         struct ctdb_call call;
856         struct ctdb_rec_data *r;
857
858         while (v->recs->count) {
859                 struct ctdb_client_call_state *state;
860                 TDB_DATA data;
861                 struct ctdb_ltdb_header *hdr;
862
863                 ZERO_STRUCT(call);
864                 call.call_id = CTDB_NULL_FUNC;
865                 call.flags = CTDB_IMMEDIATE_MIGRATION;
866                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
867
868                 r = v->r;
869                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
870                 v->recs->count--;
871
872                 call.key.dptr = &r->data[0];
873                 call.key.dsize = r->keylen;
874
875                 /* ensure we don't block this daemon - just skip a record if we can't get
876                    the chainlock */
877                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
878                         continue;
879                 }
880
881                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
882                 if (data.dptr == NULL) {
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886
887                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
888                         free(data.dptr);
889                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
890                         continue;
891                 }
892                 
893                 hdr = (struct ctdb_ltdb_header *)data.dptr;
894                 if (hdr->dmaster == v->rec->ctdb->pnn) {
895                         /* its already local */
896                         free(data.dptr);
897                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
898                         continue;
899                 }
900
901                 free(data.dptr);
902
903                 state = ctdb_call_send(v->ctdb_db, &call);
904                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
905                 if (state == NULL) {
906                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
907                         talloc_free(v);
908                         return;
909                 }
910                 state->async.fn = vacuum_fetch_callback;
911                 state->async.private_data = v;
912                 return;
913         }
914
915         talloc_free(v);
916 }
917
918
919 /*
920   destroy a vacuum info structure
921  */
922 static int vacuum_info_destructor(struct vacuum_info *v)
923 {
924         DLIST_REMOVE(v->rec->vacuum_info, v);
925         return 0;
926 }
927
928
929 /*
930   handler for vacuum fetch
931 */
932 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
933                                  TDB_DATA data, void *private_data)
934 {
935         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
936         struct ctdb_marshall_buffer *recs;
937         int ret, i;
938         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
939         const char *name;
940         struct ctdb_dbid_map *dbmap=NULL;
941         bool persistent = false;
942         struct ctdb_db_context *ctdb_db;
943         struct ctdb_rec_data *r;
944         uint32_t srcnode;
945         struct vacuum_info *v;
946
947         recs = (struct ctdb_marshall_buffer *)data.dptr;
948         r = (struct ctdb_rec_data *)&recs->data[0];
949
950         if (recs->count == 0) {
951                 talloc_free(tmp_ctx);
952                 return;
953         }
954
955         srcnode = r->reqid;
956
957         for (v=rec->vacuum_info;v;v=v->next) {
958                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
959                         /* we're already working on records from this node */
960                         talloc_free(tmp_ctx);
961                         return;
962                 }
963         }
964
965         /* work out if the database is persistent */
966         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
967         if (ret != 0) {
968                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         for (i=0;i<dbmap->num;i++) {
974                 if (dbmap->dbs[i].dbid == recs->db_id) {
975                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
976                         break;
977                 }
978         }
979         if (i == dbmap->num) {
980                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
981                 talloc_free(tmp_ctx);
982                 return;         
983         }
984
985         /* find the name of this database */
986         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
987                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
988                 talloc_free(tmp_ctx);
989                 return;
990         }
991
992         /* attach to it */
993         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
994         if (ctdb_db == NULL) {
995                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
996                 talloc_free(tmp_ctx);
997                 return;
998         }
999
1000         v = talloc_zero(rec, struct vacuum_info);
1001         if (v == NULL) {
1002                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1003                 talloc_free(tmp_ctx);
1004                 return;
1005         }
1006
1007         v->rec = rec;
1008         v->srcnode = srcnode;
1009         v->ctdb_db = ctdb_db;
1010         v->recs = talloc_memdup(v, recs, data.dsize);
1011         if (v->recs == NULL) {
1012                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1013                 talloc_free(v);
1014                 talloc_free(tmp_ctx);
1015                 return;         
1016         }
1017         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1018
1019         DLIST_ADD(rec->vacuum_info, v);
1020
1021         talloc_set_destructor(v, vacuum_info_destructor);
1022
1023         vacuum_fetch_next(v);
1024         talloc_free(tmp_ctx);
1025 }
1026
1027
1028 /*
1029   called when ctdb_wait_timeout should finish
1030  */
1031 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1032                               struct timeval yt, void *p)
1033 {
1034         uint32_t *timed_out = (uint32_t *)p;
1035         (*timed_out) = 1;
1036 }
1037
1038 /*
1039   wait for a given number of seconds
1040  */
1041 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1042 {
1043         uint32_t timed_out = 0;
1044         time_t usecs = (secs - (time_t)secs) * 1000000;
1045         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1046         while (!timed_out) {
1047                 event_loop_once(ctdb->ev);
1048         }
1049 }
1050
1051 /*
1052   called when an election times out (ends)
1053  */
1054 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1055                                   struct timeval t, void *p)
1056 {
1057         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1058         rec->election_timeout = NULL;
1059         fast_start = false;
1060
1061         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1062 }
1063
1064
1065 /*
1066   wait for an election to finish. It finished election_timeout seconds after
1067   the last election packet is received
1068  */
1069 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1070 {
1071         struct ctdb_context *ctdb = rec->ctdb;
1072         while (rec->election_timeout) {
1073                 event_loop_once(ctdb->ev);
1074         }
1075 }
1076
1077 /*
1078   Update our local flags from all remote connected nodes. 
1079   This is only run when we are or we belive we are the recovery master
1080  */
1081 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1082 {
1083         int j;
1084         struct ctdb_context *ctdb = rec->ctdb;
1085         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1086
1087         /* get the nodemap for all active remote nodes and verify
1088            they are the same as for this node
1089          */
1090         for (j=0; j<nodemap->num; j++) {
1091                 struct ctdb_node_map *remote_nodemap=NULL;
1092                 int ret;
1093
1094                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1095                         continue;
1096                 }
1097                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1098                         continue;
1099                 }
1100
1101                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1102                                            mem_ctx, &remote_nodemap);
1103                 if (ret != 0) {
1104                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1105                                   nodemap->nodes[j].pnn));
1106                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1107                         talloc_free(mem_ctx);
1108                         return MONITOR_FAILED;
1109                 }
1110                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1111                         /* We should tell our daemon about this so it
1112                            updates its flags or else we will log the same 
1113                            message again in the next iteration of recovery.
1114                            Since we are the recovery master we can just as
1115                            well update the flags on all nodes.
1116                         */
1117                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1118                         if (ret != 0) {
1119                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1120                                 return -1;
1121                         }
1122
1123                         /* Update our local copy of the flags in the recovery
1124                            daemon.
1125                         */
1126                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1127                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1128                                  nodemap->nodes[j].flags));
1129                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1130                 }
1131                 talloc_free(remote_nodemap);
1132         }
1133         talloc_free(mem_ctx);
1134         return MONITOR_OK;
1135 }
1136
1137
1138 /* Create a new random generation ip. 
1139    The generation id can not be the INVALID_GENERATION id
1140 */
1141 static uint32_t new_generation(void)
1142 {
1143         uint32_t generation;
1144
1145         while (1) {
1146                 generation = random();
1147
1148                 if (generation != INVALID_GENERATION) {
1149                         break;
1150                 }
1151         }
1152
1153         return generation;
1154 }
1155
1156
1157 /*
1158   create a temporary working database
1159  */
1160 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1161 {
1162         char *name;
1163         struct tdb_wrap *recdb;
1164         unsigned tdb_flags;
1165
1166         /* open up the temporary recovery database */
1167         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1168                                ctdb->db_directory_state,
1169                                ctdb->pnn);
1170         if (name == NULL) {
1171                 return NULL;
1172         }
1173         unlink(name);
1174
1175         tdb_flags = TDB_NOLOCK;
1176         if (ctdb->valgrinding) {
1177                 tdb_flags |= TDB_NOMMAP;
1178         }
1179         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1180
1181         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1182                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1183         if (recdb == NULL) {
1184                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1185         }
1186
1187         talloc_free(name);
1188
1189         return recdb;
1190 }
1191
1192
1193 /* 
1194    a traverse function for pulling all relevant records from recdb
1195  */
1196 struct recdb_data {
1197         struct ctdb_context *ctdb;
1198         struct ctdb_marshall_buffer *recdata;
1199         uint32_t len;
1200         uint32_t allocated_len;
1201         bool failed;
1202         bool persistent;
1203 };
1204
1205 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1206 {
1207         struct recdb_data *params = (struct recdb_data *)p;
1208         struct ctdb_rec_data *rec;
1209         struct ctdb_ltdb_header *hdr;
1210
1211         /*
1212          * skip empty records - but NOT for persistent databases:
1213          *
1214          * The record-by-record mode of recovery deletes empty records.
1215          * For persistent databases, this can lead to data corruption
1216          * by deleting records that should be there:
1217          *
1218          * - Assume the cluster has been running for a while.
1219          *
1220          * - A record R in a persistent database has been created and
1221          *   deleted a couple of times, the last operation being deletion,
1222          *   leaving an empty record with a high RSN, say 10.
1223          *
1224          * - Now a node N is turned off.
1225          *
1226          * - This leaves the local database copy of D on N with the empty
1227          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1228          *   the copy of record R.
1229          *
1230          * - Now the record is created again while node N is turned off.
1231          *   This creates R with RSN = 1 on all nodes except for N.
1232          *
1233          * - Now node N is turned on again. The following recovery will chose
1234          *   the older empty copy of R due to RSN 10 > RSN 1.
1235          *
1236          * ==> Hence the record is gone after the recovery.
1237          *
1238          * On databases like Samba's registry, this can damage the higher-level
1239          * data structures built from the various tdb-level records.
1240          */
1241         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1242                 return 0;
1243         }
1244
1245         /* update the dmaster field to point to us */
1246         hdr = (struct ctdb_ltdb_header *)data.dptr;
1247         if (!params->persistent) {
1248                 hdr->dmaster = params->ctdb->pnn;
1249                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1250         }
1251
1252         /* add the record to the blob ready to send to the nodes */
1253         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1254         if (rec == NULL) {
1255                 params->failed = true;
1256                 return -1;
1257         }
1258         if (params->len + rec->length >= params->allocated_len) {
1259                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1260                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1261         }
1262         if (params->recdata == NULL) {
1263                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1264                          rec->length + params->len));
1265                 params->failed = true;
1266                 return -1;
1267         }
1268         params->recdata->count++;
1269         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1270         params->len += rec->length;
1271         talloc_free(rec);
1272
1273         return 0;
1274 }
1275
1276 /*
1277   push the recdb database out to all nodes
1278  */
1279 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1280                                bool persistent,
1281                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1282 {
1283         struct recdb_data params;
1284         struct ctdb_marshall_buffer *recdata;
1285         TDB_DATA outdata;
1286         TALLOC_CTX *tmp_ctx;
1287         uint32_t *nodes;
1288
1289         tmp_ctx = talloc_new(ctdb);
1290         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1291
1292         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1293         CTDB_NO_MEMORY(ctdb, recdata);
1294
1295         recdata->db_id = dbid;
1296
1297         params.ctdb = ctdb;
1298         params.recdata = recdata;
1299         params.len = offsetof(struct ctdb_marshall_buffer, data);
1300         params.allocated_len = params.len;
1301         params.failed = false;
1302         params.persistent = persistent;
1303
1304         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1305                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1306                 talloc_free(params.recdata);
1307                 talloc_free(tmp_ctx);
1308                 return -1;
1309         }
1310
1311         if (params.failed) {
1312                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1313                 talloc_free(params.recdata);
1314                 talloc_free(tmp_ctx);
1315                 return -1;              
1316         }
1317
1318         recdata = params.recdata;
1319
1320         outdata.dptr = (void *)recdata;
1321         outdata.dsize = params.len;
1322
1323         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1324         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1325                                         nodes, 0,
1326                                         CONTROL_TIMEOUT(), false, outdata,
1327                                         NULL, NULL,
1328                                         NULL) != 0) {
1329                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1330                 talloc_free(recdata);
1331                 talloc_free(tmp_ctx);
1332                 return -1;
1333         }
1334
1335         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1336                   dbid, recdata->count));
1337
1338         talloc_free(recdata);
1339         talloc_free(tmp_ctx);
1340
1341         return 0;
1342 }
1343
1344
1345 /*
1346   go through a full recovery on one database 
1347  */
1348 static int recover_database(struct ctdb_recoverd *rec, 
1349                             TALLOC_CTX *mem_ctx,
1350                             uint32_t dbid,
1351                             bool persistent,
1352                             uint32_t pnn, 
1353                             struct ctdb_node_map *nodemap,
1354                             uint32_t transaction_id)
1355 {
1356         struct tdb_wrap *recdb;
1357         int ret;
1358         struct ctdb_context *ctdb = rec->ctdb;
1359         TDB_DATA data;
1360         struct ctdb_control_wipe_database w;
1361         uint32_t *nodes;
1362
1363         recdb = create_recdb(ctdb, mem_ctx);
1364         if (recdb == NULL) {
1365                 return -1;
1366         }
1367
1368         /* pull all remote databases onto the recdb */
1369         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1370         if (ret != 0) {
1371                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1372                 return -1;
1373         }
1374
1375         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1376
1377         /* wipe all the remote databases. This is safe as we are in a transaction */
1378         w.db_id = dbid;
1379         w.transaction_id = transaction_id;
1380
1381         data.dptr = (void *)&w;
1382         data.dsize = sizeof(w);
1383
1384         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1385         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1386                                         nodes, 0,
1387                                         CONTROL_TIMEOUT(), false, data,
1388                                         NULL, NULL,
1389                                         NULL) != 0) {
1390                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1391                 talloc_free(recdb);
1392                 return -1;
1393         }
1394         
1395         /* push out the correct database. This sets the dmaster and skips 
1396            the empty records */
1397         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1398         if (ret != 0) {
1399                 talloc_free(recdb);
1400                 return -1;
1401         }
1402
1403         /* all done with this database */
1404         talloc_free(recdb);
1405
1406         return 0;
1407 }
1408
1409 /*
1410   reload the nodes file 
1411 */
1412 static void reload_nodes_file(struct ctdb_context *ctdb)
1413 {
1414         ctdb->nodes = NULL;
1415         ctdb_load_nodes_file(ctdb);
1416 }
1417
1418 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1419                                          struct ctdb_recoverd *rec,
1420                                          struct ctdb_node_map *nodemap,
1421                                          uint32_t *culprit)
1422 {
1423         int j;
1424         int ret;
1425
1426         if (ctdb->num_nodes != nodemap->num) {
1427                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1428                                   ctdb->num_nodes, nodemap->num));
1429                 if (culprit) {
1430                         *culprit = ctdb->pnn;
1431                 }
1432                 return -1;
1433         }
1434
1435         for (j=0; j<nodemap->num; j++) {
1436                 /* For readability */
1437                 struct ctdb_node *node = ctdb->nodes[j];
1438
1439                 /* release any existing data */
1440                 if (node->known_public_ips) {
1441                         talloc_free(node->known_public_ips);
1442                         node->known_public_ips = NULL;
1443                 }
1444                 if (node->available_public_ips) {
1445                         talloc_free(node->available_public_ips);
1446                         node->available_public_ips = NULL;
1447                 }
1448
1449                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1450                         continue;
1451                 }
1452
1453                 /* Retrieve the list of known public IPs from the node */
1454                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1455                                         CONTROL_TIMEOUT(),
1456                                         node->pnn,
1457                                         ctdb->nodes,
1458                                         0,
1459                                         &node->known_public_ips);
1460                 if (ret != 0) {
1461                         DEBUG(DEBUG_ERR,
1462                               ("Failed to read known public IPs from node: %u\n",
1463                                node->pnn));
1464                         if (culprit) {
1465                                 *culprit = node->pnn;
1466                         }
1467                         return -1;
1468                 }
1469
1470                 if (ctdb->do_checkpublicip &&
1471                     (rec->ip_check_disable_ctx == NULL) &&
1472                     verify_remote_ip_allocation(ctdb,
1473                                                  node->known_public_ips,
1474                                                  node->pnn)) {
1475                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1476                         rec->need_takeover_run = true;
1477                 }
1478
1479                 /* Retrieve the list of available public IPs from the node */
1480                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1481                                         CONTROL_TIMEOUT(),
1482                                         node->pnn,
1483                                         ctdb->nodes,
1484                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1485                                         &node->available_public_ips);
1486                 if (ret != 0) {
1487                         DEBUG(DEBUG_ERR,
1488                               ("Failed to read available public IPs from node: %u\n",
1489                                node->pnn));
1490                         if (culprit) {
1491                                 *culprit = node->pnn;
1492                         }
1493                         return -1;
1494                 }
1495         }
1496
1497         return 0;
1498 }
1499
1500 /* when we start a recovery, make sure all nodes use the same reclock file
1501    setting
1502 */
1503 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1504 {
1505         struct ctdb_context *ctdb = rec->ctdb;
1506         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1507         TDB_DATA data;
1508         uint32_t *nodes;
1509
1510         if (ctdb->recovery_lock_file == NULL) {
1511                 data.dptr  = NULL;
1512                 data.dsize = 0;
1513         } else {
1514                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1515                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1516         }
1517
1518         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1519         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1520                                         nodes, 0,
1521                                         CONTROL_TIMEOUT(),
1522                                         false, data,
1523                                         NULL, NULL,
1524                                         rec) != 0) {
1525                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1526                 talloc_free(tmp_ctx);
1527                 return -1;
1528         }
1529
1530         talloc_free(tmp_ctx);
1531         return 0;
1532 }
1533
1534
1535 /*
1536  * this callback is called for every node that failed to execute ctdb_takeover_run()
1537  * and set flag to re-run takeover run.
1538  */
1539 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1540 {
1541         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1542
1543         if (callback_data != NULL) {
1544                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1545
1546                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1547
1548                 ctdb_set_culprit(rec, node_pnn);
1549                 rec->need_takeover_run = true;
1550         }
1551 }
1552
1553
1554 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1555 {
1556         struct ctdb_context *ctdb = rec->ctdb;
1557         int i;
1558         struct ctdb_banning_state *ban_state;
1559
1560         *self_ban = false;
1561         for (i=0; i<ctdb->num_nodes; i++) {
1562                 if (ctdb->nodes[i]->ban_state == NULL) {
1563                         continue;
1564                 }
1565                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1566                 if (ban_state->count < 2*ctdb->num_nodes) {
1567                         continue;
1568                 }
1569
1570                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1571                         ctdb->nodes[i]->pnn, ban_state->count,
1572                         ctdb->tunable.recovery_ban_period));
1573                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1574                 ban_state->count = 0;
1575
1576                 /* Banning ourself? */
1577                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1578                         *self_ban = true;
1579                 }
1580         }
1581 }
1582
1583
1584 /*
1585   we are the recmaster, and recovery is needed - start a recovery run
1586  */
1587 static int do_recovery(struct ctdb_recoverd *rec, 
1588                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1589                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1590 {
1591         struct ctdb_context *ctdb = rec->ctdb;
1592         int i, j, ret;
1593         uint32_t generation;
1594         struct ctdb_dbid_map *dbmap;
1595         TDB_DATA data;
1596         uint32_t *nodes;
1597         struct timeval start_time;
1598         uint32_t culprit = (uint32_t)-1;
1599         bool self_ban;
1600
1601         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1602
1603         /* if recovery fails, force it again */
1604         rec->need_recovery = true;
1605
1606         ban_misbehaving_nodes(rec, &self_ban);
1607         if (self_ban) {
1608                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1609                 return -1;
1610         }
1611
1612         if (ctdb->tunable.verify_recovery_lock != 0) {
1613                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1614                 start_time = timeval_current();
1615                 if (!ctdb_recovery_lock(ctdb, true)) {
1616                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1617                                          "and ban ourself for %u seconds\n",
1618                                          ctdb->tunable.recovery_ban_period));
1619                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1620                         return -1;
1621                 }
1622                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1623                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1624         }
1625
1626         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1627
1628         /* get a list of all databases */
1629         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1630         if (ret != 0) {
1631                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1632                 return -1;
1633         }
1634
1635         /* we do the db creation before we set the recovery mode, so the freeze happens
1636            on all databases we will be dealing with. */
1637
1638         /* verify that we have all the databases any other node has */
1639         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1640         if (ret != 0) {
1641                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1642                 return -1;
1643         }
1644
1645         /* verify that all other nodes have all our databases */
1646         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1647         if (ret != 0) {
1648                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1649                 return -1;
1650         }
1651         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1652
1653         /* update the database priority for all remote databases */
1654         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1655         if (ret != 0) {
1656                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1657         }
1658         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1659
1660
1661         /* update all other nodes to use the same setting for reclock files
1662            as the local recovery master.
1663         */
1664         sync_recovery_lock_file_across_cluster(rec);
1665
1666         /* set recovery mode to active on all nodes */
1667         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1668         if (ret != 0) {
1669                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1670                 return -1;
1671         }
1672
1673         /* execute the "startrecovery" event script on all nodes */
1674         ret = run_startrecovery_eventscript(rec, nodemap);
1675         if (ret!=0) {
1676                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1677                 return -1;
1678         }
1679
1680         /*
1681           update all nodes to have the same flags that we have
1682          */
1683         for (i=0;i<nodemap->num;i++) {
1684                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1685                         continue;
1686                 }
1687
1688                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1689                 if (ret != 0) {
1690                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1691                         return -1;
1692                 }
1693         }
1694
1695         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1696
1697         /* pick a new generation number */
1698         generation = new_generation();
1699
1700         /* change the vnnmap on this node to use the new generation 
1701            number but not on any other nodes.
1702            this guarantees that if we abort the recovery prematurely
1703            for some reason (a node stops responding?)
1704            that we can just return immediately and we will reenter
1705            recovery shortly again.
1706            I.e. we deliberately leave the cluster with an inconsistent
1707            generation id to allow us to abort recovery at any stage and
1708            just restart it from scratch.
1709          */
1710         vnnmap->generation = generation;
1711         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1712         if (ret != 0) {
1713                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1714                 return -1;
1715         }
1716
1717         data.dptr = (void *)&generation;
1718         data.dsize = sizeof(uint32_t);
1719
1720         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1721         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1722                                         nodes, 0,
1723                                         CONTROL_TIMEOUT(), false, data,
1724                                         NULL,
1725                                         transaction_start_fail_callback,
1726                                         rec) != 0) {
1727                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1728                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1729                                         nodes, 0,
1730                                         CONTROL_TIMEOUT(), false, tdb_null,
1731                                         NULL,
1732                                         NULL,
1733                                         NULL) != 0) {
1734                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1735                 }
1736                 return -1;
1737         }
1738
1739         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1740
1741         for (i=0;i<dbmap->num;i++) {
1742                 ret = recover_database(rec, mem_ctx,
1743                                        dbmap->dbs[i].dbid,
1744                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1745                                        pnn, nodemap, generation);
1746                 if (ret != 0) {
1747                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1748                         return -1;
1749                 }
1750         }
1751
1752         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1753
1754         /* commit all the changes */
1755         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1756                                         nodes, 0,
1757                                         CONTROL_TIMEOUT(), false, data,
1758                                         NULL, NULL,
1759                                         NULL) != 0) {
1760                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1761                 return -1;
1762         }
1763
1764         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1765         
1766
1767         /* update the capabilities for all nodes */
1768         ret = update_capabilities(ctdb, nodemap);
1769         if (ret!=0) {
1770                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1771                 return -1;
1772         }
1773
1774         /* build a new vnn map with all the currently active and
1775            unbanned nodes */
1776         generation = new_generation();
1777         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1778         CTDB_NO_MEMORY(ctdb, vnnmap);
1779         vnnmap->generation = generation;
1780         vnnmap->size = 0;
1781         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1782         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1783         for (i=j=0;i<nodemap->num;i++) {
1784                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1785                         continue;
1786                 }
1787                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1788                         /* this node can not be an lmaster */
1789                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1790                         continue;
1791                 }
1792
1793                 vnnmap->size++;
1794                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1795                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1796                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1797
1798         }
1799         if (vnnmap->size == 0) {
1800                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1801                 vnnmap->size++;
1802                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1803                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1804                 vnnmap->map[0] = pnn;
1805         }       
1806
1807         /* update to the new vnnmap on all nodes */
1808         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1809         if (ret != 0) {
1810                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1811                 return -1;
1812         }
1813
1814         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1815
1816         /* update recmaster to point to us for all nodes */
1817         ret = set_recovery_master(ctdb, nodemap, pnn);
1818         if (ret!=0) {
1819                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1820                 return -1;
1821         }
1822
1823         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1824
1825         /*
1826           update all nodes to have the same flags that we have
1827          */
1828         for (i=0;i<nodemap->num;i++) {
1829                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1830                         continue;
1831                 }
1832
1833                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1834                 if (ret != 0) {
1835                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1836                         return -1;
1837                 }
1838         }
1839
1840         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1841
1842         /* disable recovery mode */
1843         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1846                 return -1;
1847         }
1848
1849         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1850
1851         /* Fetch known/available public IPs from each active node */
1852         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1855                                  culprit));
1856                 rec->need_takeover_run = true;
1857                 return -1;
1858         }
1859         rec->need_takeover_run = false;
1860         ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, NULL);
1861         if (ret != 0) {
1862                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1863                 rec->need_takeover_run = true;
1864         }
1865
1866         /* execute the "recovered" event script on all nodes */
1867         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1868         if (ret!=0) {
1869                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1870                 return -1;
1871         }
1872
1873         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1874
1875         /* send a message to all clients telling them that the cluster 
1876            has been reconfigured */
1877         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1878
1879         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1880
1881         rec->need_recovery = false;
1882
1883         /* we managed to complete a full recovery, make sure to forgive
1884            any past sins by the nodes that could now participate in the
1885            recovery.
1886         */
1887         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1888         for (i=0;i<nodemap->num;i++) {
1889                 struct ctdb_banning_state *ban_state;
1890
1891                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1892                         continue;
1893                 }
1894
1895                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1896                 if (ban_state == NULL) {
1897                         continue;
1898                 }
1899
1900                 ban_state->count = 0;
1901         }
1902
1903
1904         /* We just finished a recovery successfully. 
1905            We now wait for rerecovery_timeout before we allow 
1906            another recovery to take place.
1907         */
1908         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1909         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1910         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1911
1912         return 0;
1913 }
1914
1915
1916 /*
1917   elections are won by first checking the number of connected nodes, then
1918   the priority time, then the pnn
1919  */
1920 struct election_message {
1921         uint32_t num_connected;
1922         struct timeval priority_time;
1923         uint32_t pnn;
1924         uint32_t node_flags;
1925 };
1926
1927 /*
1928   form this nodes election data
1929  */
1930 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1931 {
1932         int ret, i;
1933         struct ctdb_node_map *nodemap;
1934         struct ctdb_context *ctdb = rec->ctdb;
1935
1936         ZERO_STRUCTP(em);
1937
1938         em->pnn = rec->ctdb->pnn;
1939         em->priority_time = rec->priority_time;
1940
1941         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1942         if (ret != 0) {
1943                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1944                 return;
1945         }
1946
1947         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1948         em->node_flags = rec->node_flags;
1949
1950         for (i=0;i<nodemap->num;i++) {
1951                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1952                         em->num_connected++;
1953                 }
1954         }
1955
1956         /* we shouldnt try to win this election if we cant be a recmaster */
1957         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1958                 em->num_connected = 0;
1959                 em->priority_time = timeval_current();
1960         }
1961
1962         talloc_free(nodemap);
1963 }
1964
1965 /*
1966   see if the given election data wins
1967  */
1968 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1969 {
1970         struct election_message myem;
1971         int cmp = 0;
1972
1973         ctdb_election_data(rec, &myem);
1974
1975         /* we cant win if we dont have the recmaster capability */
1976         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1977                 return false;
1978         }
1979
1980         /* we cant win if we are banned */
1981         if (rec->node_flags & NODE_FLAGS_BANNED) {
1982                 return false;
1983         }
1984
1985         /* we cant win if we are stopped */
1986         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1987                 return false;
1988         }
1989
1990         /* we will automatically win if the other node is banned */
1991         if (em->node_flags & NODE_FLAGS_BANNED) {
1992                 return true;
1993         }
1994
1995         /* we will automatically win if the other node is banned */
1996         if (em->node_flags & NODE_FLAGS_STOPPED) {
1997                 return true;
1998         }
1999
2000         /* try to use the most connected node */
2001         if (cmp == 0) {
2002                 cmp = (int)myem.num_connected - (int)em->num_connected;
2003         }
2004
2005         /* then the longest running node */
2006         if (cmp == 0) {
2007                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2008         }
2009
2010         if (cmp == 0) {
2011                 cmp = (int)myem.pnn - (int)em->pnn;
2012         }
2013
2014         return cmp > 0;
2015 }
2016
2017 /*
2018   send out an election request
2019  */
2020 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2021 {
2022         int ret;
2023         TDB_DATA election_data;
2024         struct election_message emsg;
2025         uint64_t srvid;
2026         struct ctdb_context *ctdb = rec->ctdb;
2027
2028         srvid = CTDB_SRVID_RECOVERY;
2029
2030         ctdb_election_data(rec, &emsg);
2031
2032         election_data.dsize = sizeof(struct election_message);
2033         election_data.dptr  = (unsigned char *)&emsg;
2034
2035
2036         /* send an election message to all active nodes */
2037         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2038         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2039
2040
2041         /* A new node that is already frozen has entered the cluster.
2042            The existing nodes are not frozen and dont need to be frozen
2043            until the election has ended and we start the actual recovery
2044         */
2045         if (update_recmaster == true) {
2046                 /* first we assume we will win the election and set 
2047                    recoverymaster to be ourself on the current node
2048                  */
2049                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2050                 if (ret != 0) {
2051                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2052                         return -1;
2053                 }
2054         }
2055
2056
2057         return 0;
2058 }
2059
2060 /*
2061   this function will unban all nodes in the cluster
2062 */
2063 static void unban_all_nodes(struct ctdb_context *ctdb)
2064 {
2065         int ret, i;
2066         struct ctdb_node_map *nodemap;
2067         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2068         
2069         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2070         if (ret != 0) {
2071                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2072                 return;
2073         }
2074
2075         for (i=0;i<nodemap->num;i++) {
2076                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2077                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2078                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2079                 }
2080         }
2081
2082         talloc_free(tmp_ctx);
2083 }
2084
2085
2086 /*
2087   we think we are winning the election - send a broadcast election request
2088  */
2089 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2090 {
2091         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2092         int ret;
2093
2094         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2095         if (ret != 0) {
2096                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2097         }
2098
2099         talloc_free(rec->send_election_te);
2100         rec->send_election_te = NULL;
2101 }
2102
2103 /*
2104   handler for memory dumps
2105 */
2106 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2107                              TDB_DATA data, void *private_data)
2108 {
2109         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2110         TDB_DATA *dump;
2111         int ret;
2112         struct rd_memdump_reply *rd;
2113
2114         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2115                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2116                 talloc_free(tmp_ctx);
2117                 return;
2118         }
2119         rd = (struct rd_memdump_reply *)data.dptr;
2120
2121         dump = talloc_zero(tmp_ctx, TDB_DATA);
2122         if (dump == NULL) {
2123                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2124                 talloc_free(tmp_ctx);
2125                 return;
2126         }
2127         ret = ctdb_dump_memory(ctdb, dump);
2128         if (ret != 0) {
2129                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2130                 talloc_free(tmp_ctx);
2131                 return;
2132         }
2133
2134 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2135
2136         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2137         if (ret != 0) {
2138                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2139                 talloc_free(tmp_ctx);
2140                 return;
2141         }
2142
2143         talloc_free(tmp_ctx);
2144 }
2145
2146 /*
2147   handler for getlog
2148 */
2149 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2150                            TDB_DATA data, void *private_data)
2151 {
2152         struct ctdb_get_log_addr *log_addr;
2153         pid_t child;
2154
2155         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2156                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2157                 return;
2158         }
2159         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2160
2161         child = ctdb_fork_no_free_ringbuffer(ctdb);
2162         if (child == (pid_t)-1) {
2163                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2164                 return;
2165         }
2166
2167         if (child == 0) {
2168                 ctdb_set_process_name("ctdb_rec_log_collector");
2169                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2170                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2171                         _exit(1);
2172                 }
2173                 ctdb_collect_log(ctdb, log_addr);
2174                 _exit(0);
2175         }
2176 }
2177
2178 /*
2179   handler for clearlog
2180 */
2181 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2182                              TDB_DATA data, void *private_data)
2183 {
2184         ctdb_clear_log(ctdb);
2185 }
2186
2187 /*
2188   handler for reload_nodes
2189 */
2190 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2191                              TDB_DATA data, void *private_data)
2192 {
2193         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2194
2195         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2196
2197         reload_nodes_file(rec->ctdb);
2198 }
2199
2200
2201 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2202                               struct timeval yt, void *p)
2203 {
2204         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2205
2206         talloc_free(rec->ip_check_disable_ctx);
2207         rec->ip_check_disable_ctx = NULL;
2208 }
2209
2210
2211 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2212                                   struct timeval t, void *p)
2213 {
2214         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2215         struct ctdb_context *ctdb = rec->ctdb;
2216         int ret;
2217
2218         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2219
2220         ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2221         if (ret != 0) {
2222                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2223                 rec->need_takeover_run = true;
2224         }
2225
2226         talloc_free(rec->deferred_rebalance_ctx);
2227         rec->deferred_rebalance_ctx = NULL;
2228 }
2229
2230         
2231 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2232                              TDB_DATA data, void *private_data)
2233 {
2234         uint32_t pnn;
2235         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2236
2237         if (data.dsize != sizeof(uint32_t)) {
2238                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2239                 return;
2240         }
2241
2242         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2243                 return;
2244         }
2245
2246         pnn = *(uint32_t *)&data.dptr[0];
2247
2248         lcp2_forcerebalance(ctdb, pnn);
2249         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2250
2251         if (rec->deferred_rebalance_ctx != NULL) {
2252                 talloc_free(rec->deferred_rebalance_ctx);
2253         }
2254         rec->deferred_rebalance_ctx = talloc_new(rec);
2255         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2256                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2257                         ctdb_rebalance_timeout, rec);
2258 }
2259
2260
2261
2262 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2263                              TDB_DATA data, void *private_data)
2264 {
2265         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2266         struct ctdb_public_ip *ip;
2267
2268         if (rec->recmaster != rec->ctdb->pnn) {
2269                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2270                 return;
2271         }
2272
2273         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2274                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2275                 return;
2276         }
2277
2278         ip = (struct ctdb_public_ip *)data.dptr;
2279
2280         update_ip_assignment_tree(rec->ctdb, ip);
2281 }
2282
2283
2284 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2285                              TDB_DATA data, void *private_data)
2286 {
2287         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2288         uint32_t timeout;
2289
2290         if (rec->ip_check_disable_ctx != NULL) {
2291                 talloc_free(rec->ip_check_disable_ctx);
2292                 rec->ip_check_disable_ctx = NULL;
2293         }
2294
2295         if (data.dsize != sizeof(uint32_t)) {
2296                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2297                                  "expexting %lu\n", (long unsigned)data.dsize,
2298                                  (long unsigned)sizeof(uint32_t)));
2299                 return;
2300         }
2301         if (data.dptr == NULL) {
2302                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2303                 return;
2304         }
2305
2306         timeout = *((uint32_t *)data.dptr);
2307
2308         if (timeout == 0) {
2309                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2310                 return;
2311         }
2312                 
2313         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2314
2315         rec->ip_check_disable_ctx = talloc_new(rec);
2316         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2317
2318         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2319 }
2320
2321
2322 /*
2323   handler for reload all ips.
2324 */
2325 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2326                              TDB_DATA data, void *private_data)
2327 {
2328         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2329
2330         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2331                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2332                 return;
2333         }
2334
2335         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2336
2337         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2338         return;
2339 }
2340
2341 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2342 {
2343         uint32_t *status = callback_data;
2344
2345         if (res != 0) {
2346                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2347                 *status = 1;
2348         }
2349 }
2350
2351 static int
2352 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2353 {
2354         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2355         uint32_t *nodes;
2356         uint32_t status;
2357         int i;
2358
2359         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2360         for (i = 0; i< nodemap->num; i++) {
2361                 if (nodemap->nodes[i].flags != 0) {
2362                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2363                         talloc_free(tmp_ctx);
2364                         return -1;
2365                 }
2366         }
2367
2368         /* send the flags update to all connected nodes */
2369         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2370         status = 0;
2371         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2372                                         nodes, 0,
2373                                         CONTROL_TIMEOUT(),
2374                                         false, tdb_null,
2375                                         async_reloadips_callback, NULL,
2376                                         &status) != 0) {
2377                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2378                 talloc_free(tmp_ctx);
2379                 return -1;
2380         }
2381
2382         if (status != 0) {
2383                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2384                 talloc_free(tmp_ctx);
2385                 return -1;
2386         }
2387
2388         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2389
2390         talloc_free(tmp_ctx);
2391         return 0;
2392 }
2393
2394
2395 /*
2396   handler for ip reallocate, just add it to the list of callers and 
2397   handle this later in the monitor_cluster loop so we do not recurse
2398   with other callers to takeover_run()
2399 */
2400 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2401                              TDB_DATA data, void *private_data)
2402 {
2403         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2404         struct ip_reallocate_list *caller;
2405
2406         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2407                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2408                 return;
2409         }
2410
2411         if (rec->ip_reallocate_ctx == NULL) {
2412                 rec->ip_reallocate_ctx = talloc_new(rec);
2413                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2414         }
2415
2416         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2417         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2418
2419         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2420         caller->next = rec->reallocate_callers;
2421         rec->reallocate_callers = caller;
2422
2423         return;
2424 }
2425
2426 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2427 {
2428         TDB_DATA result;
2429         int32_t ret;
2430         struct ip_reallocate_list *callers;
2431         uint32_t culprit;
2432
2433         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2434
2435         /* update the list of public ips that a node can handle for
2436            all connected nodes
2437         */
2438         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2439         if (ret != 0) {
2440                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2441                                  culprit));
2442                 rec->need_takeover_run = true;
2443         }
2444         if (ret == 0) {
2445                 ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
2446                 if (ret != 0) {
2447                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2448                         rec->need_takeover_run = true;
2449                 }
2450         }
2451
2452         result.dsize = sizeof(int32_t);
2453         result.dptr  = (uint8_t *)&ret;
2454
2455         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2456
2457                 /* Someone that sent srvid==0 does not want a reply */
2458                 if (callers->rd->srvid == 0) {
2459                         continue;
2460                 }
2461                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2462                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2463                                   (unsigned long long)callers->rd->srvid));
2464                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2465                 if (ret != 0) {
2466                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2467                                          "message to %u:%llu\n",
2468                                          (unsigned)callers->rd->pnn,
2469                                          (unsigned long long)callers->rd->srvid));
2470                 }
2471         }
2472
2473         talloc_free(rec->ip_reallocate_ctx);
2474         rec->ip_reallocate_ctx = NULL;
2475         rec->reallocate_callers = NULL;
2476 }
2477
2478
2479 /*
2480   handler for recovery master elections
2481 */
2482 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2483                              TDB_DATA data, void *private_data)
2484 {
2485         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2486         int ret;
2487         struct election_message *em = (struct election_message *)data.dptr;
2488         TALLOC_CTX *mem_ctx;
2489
2490         /* we got an election packet - update the timeout for the election */
2491         talloc_free(rec->election_timeout);
2492         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2493                                                 fast_start ?
2494                                                 timeval_current_ofs(0, 500000) :
2495                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2496                                                 ctdb_election_timeout, rec);
2497
2498         mem_ctx = talloc_new(ctdb);
2499
2500         /* someone called an election. check their election data
2501            and if we disagree and we would rather be the elected node, 
2502            send a new election message to all other nodes
2503          */
2504         if (ctdb_election_win(rec, em)) {
2505                 if (!rec->send_election_te) {
2506                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2507                                                                 timeval_current_ofs(0, 500000),
2508                                                                 election_send_request, rec);
2509                 }
2510                 talloc_free(mem_ctx);
2511                 /*unban_all_nodes(ctdb);*/
2512                 return;
2513         }
2514         
2515         /* we didn't win */
2516         talloc_free(rec->send_election_te);
2517         rec->send_election_te = NULL;
2518
2519         if (ctdb->tunable.verify_recovery_lock != 0) {
2520                 /* release the recmaster lock */
2521                 if (em->pnn != ctdb->pnn &&
2522                     ctdb->recovery_lock_fd != -1) {
2523                         close(ctdb->recovery_lock_fd);
2524                         ctdb->recovery_lock_fd = -1;
2525                         unban_all_nodes(ctdb);
2526                 }
2527         }
2528
2529         /* ok, let that guy become recmaster then */
2530         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2531         if (ret != 0) {
2532                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2533                 talloc_free(mem_ctx);
2534                 return;
2535         }
2536
2537         talloc_free(mem_ctx);
2538         return;
2539 }
2540
2541
2542 /*
2543   force the start of the election process
2544  */
2545 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2546                            struct ctdb_node_map *nodemap)
2547 {
2548         int ret;
2549         struct ctdb_context *ctdb = rec->ctdb;
2550
2551         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2552
2553         /* set all nodes to recovery mode to stop all internode traffic */
2554         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2555         if (ret != 0) {
2556                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2557                 return;
2558         }
2559
2560         talloc_free(rec->election_timeout);
2561         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2562                                                 fast_start ?
2563                                                 timeval_current_ofs(0, 500000) :
2564                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2565                                                 ctdb_election_timeout, rec);
2566
2567         ret = send_election_request(rec, pnn, true);
2568         if (ret!=0) {
2569                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2570                 return;
2571         }
2572
2573         /* wait for a few seconds to collect all responses */
2574         ctdb_wait_election(rec);
2575 }
2576
2577
2578
2579 /*
2580   handler for when a node changes its flags
2581 */
2582 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2583                             TDB_DATA data, void *private_data)
2584 {
2585         int ret;
2586         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2587         struct ctdb_node_map *nodemap=NULL;
2588         TALLOC_CTX *tmp_ctx;
2589         int i;
2590         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2591         int disabled_flag_changed;
2592
2593         if (data.dsize != sizeof(*c)) {
2594                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2595                 return;
2596         }
2597
2598         tmp_ctx = talloc_new(ctdb);
2599         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2600
2601         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2602         if (ret != 0) {
2603                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2604                 talloc_free(tmp_ctx);
2605                 return;         
2606         }
2607
2608
2609         for (i=0;i<nodemap->num;i++) {
2610                 if (nodemap->nodes[i].pnn == c->pnn) break;
2611         }
2612
2613         if (i == nodemap->num) {
2614                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2615                 talloc_free(tmp_ctx);
2616                 return;
2617         }
2618
2619         if (c->old_flags != c->new_flags) {
2620                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2621         }
2622
2623         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2624
2625         nodemap->nodes[i].flags = c->new_flags;
2626
2627         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2628                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2629
2630         if (ret == 0) {
2631                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2632                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2633         }
2634         
2635         if (ret == 0 &&
2636             ctdb->recovery_master == ctdb->pnn &&
2637             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2638                 /* Only do the takeover run if the perm disabled or unhealthy
2639                    flags changed since these will cause an ip failover but not
2640                    a recovery.
2641                    If the node became disconnected or banned this will also
2642                    lead to an ip address failover but that is handled 
2643                    during recovery
2644                 */
2645                 if (disabled_flag_changed) {
2646                         rec->need_takeover_run = true;
2647                 }
2648         }
2649
2650         talloc_free(tmp_ctx);
2651 }
2652
2653 /*
2654   handler for when we need to push out flag changes ot all other nodes
2655 */
2656 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2657                             TDB_DATA data, void *private_data)
2658 {
2659         int ret;
2660         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2661         struct ctdb_node_map *nodemap=NULL;
2662         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2663         uint32_t recmaster;
2664         uint32_t *nodes;
2665
2666         /* find the recovery master */
2667         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2668         if (ret != 0) {
2669                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2670                 talloc_free(tmp_ctx);
2671                 return;
2672         }
2673
2674         /* read the node flags from the recmaster */
2675         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2676         if (ret != 0) {
2677                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2678                 talloc_free(tmp_ctx);
2679                 return;
2680         }
2681         if (c->pnn >= nodemap->num) {
2682                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2683                 talloc_free(tmp_ctx);
2684                 return;
2685         }
2686
2687         /* send the flags update to all connected nodes */
2688         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2689
2690         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2691                                       nodes, 0, CONTROL_TIMEOUT(),
2692                                       false, data,
2693                                       NULL, NULL,
2694                                       NULL) != 0) {
2695                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2696
2697                 talloc_free(tmp_ctx);
2698                 return;
2699         }
2700
2701         talloc_free(tmp_ctx);
2702 }
2703
2704
2705 struct verify_recmode_normal_data {
2706         uint32_t count;
2707         enum monitor_result status;
2708 };
2709
2710 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2711 {
2712         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2713
2714
2715         /* one more node has responded with recmode data*/
2716         rmdata->count--;
2717
2718         /* if we failed to get the recmode, then return an error and let
2719            the main loop try again.
2720         */
2721         if (state->state != CTDB_CONTROL_DONE) {
2722                 if (rmdata->status == MONITOR_OK) {
2723                         rmdata->status = MONITOR_FAILED;
2724                 }
2725                 return;
2726         }
2727
2728         /* if we got a response, then the recmode will be stored in the
2729            status field
2730         */
2731         if (state->status != CTDB_RECOVERY_NORMAL) {
2732                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2733                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2734         }
2735
2736         return;
2737 }
2738
2739
2740 /* verify that all nodes are in normal recovery mode */
2741 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2742 {
2743         struct verify_recmode_normal_data *rmdata;
2744         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2745         struct ctdb_client_control_state *state;
2746         enum monitor_result status;
2747         int j;
2748         
2749         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2750         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2751         rmdata->count  = 0;
2752         rmdata->status = MONITOR_OK;
2753
2754         /* loop over all active nodes and send an async getrecmode call to 
2755            them*/
2756         for (j=0; j<nodemap->num; j++) {
2757                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2758                         continue;
2759                 }
2760                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2761                                         CONTROL_TIMEOUT(), 
2762                                         nodemap->nodes[j].pnn);
2763                 if (state == NULL) {
2764                         /* we failed to send the control, treat this as 
2765                            an error and try again next iteration
2766                         */                      
2767                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2768                         talloc_free(mem_ctx);
2769                         return MONITOR_FAILED;
2770                 }
2771
2772                 /* set up the callback functions */
2773                 state->async.fn = verify_recmode_normal_callback;
2774                 state->async.private_data = rmdata;
2775
2776                 /* one more control to wait for to complete */
2777                 rmdata->count++;
2778         }
2779
2780
2781         /* now wait for up to the maximum number of seconds allowed
2782            or until all nodes we expect a response from has replied
2783         */
2784         while (rmdata->count > 0) {
2785                 event_loop_once(ctdb->ev);
2786         }
2787
2788         status = rmdata->status;
2789         talloc_free(mem_ctx);
2790         return status;
2791 }
2792
2793
2794 struct verify_recmaster_data {
2795         struct ctdb_recoverd *rec;
2796         uint32_t count;
2797         uint32_t pnn;
2798         enum monitor_result status;
2799 };
2800
2801 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2802 {
2803         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2804
2805
2806         /* one more node has responded with recmaster data*/
2807         rmdata->count--;
2808
2809         /* if we failed to get the recmaster, then return an error and let
2810            the main loop try again.
2811         */
2812         if (state->state != CTDB_CONTROL_DONE) {
2813                 if (rmdata->status == MONITOR_OK) {
2814                         rmdata->status = MONITOR_FAILED;
2815                 }
2816                 return;
2817         }
2818
2819         /* if we got a response, then the recmaster will be stored in the
2820            status field
2821         */
2822         if (state->status != rmdata->pnn) {
2823                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2824                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2825                 rmdata->status = MONITOR_ELECTION_NEEDED;
2826         }
2827
2828         return;
2829 }
2830
2831
2832 /* verify that all nodes agree that we are the recmaster */
2833 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2834 {
2835         struct ctdb_context *ctdb = rec->ctdb;
2836         struct verify_recmaster_data *rmdata;
2837         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2838         struct ctdb_client_control_state *state;
2839         enum monitor_result status;
2840         int j;
2841         
2842         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2843         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2844         rmdata->rec    = rec;
2845         rmdata->count  = 0;
2846         rmdata->pnn    = pnn;
2847         rmdata->status = MONITOR_OK;
2848
2849         /* loop over all active nodes and send an async getrecmaster call to 
2850            them*/
2851         for (j=0; j<nodemap->num; j++) {
2852                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2853                         continue;
2854                 }
2855                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2856                                         CONTROL_TIMEOUT(),
2857                                         nodemap->nodes[j].pnn);
2858                 if (state == NULL) {
2859                         /* we failed to send the control, treat this as 
2860                            an error and try again next iteration
2861                         */                      
2862                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2863                         talloc_free(mem_ctx);
2864                         return MONITOR_FAILED;
2865                 }
2866
2867                 /* set up the callback functions */
2868                 state->async.fn = verify_recmaster_callback;
2869                 state->async.private_data = rmdata;
2870
2871                 /* one more control to wait for to complete */
2872                 rmdata->count++;
2873         }
2874
2875
2876         /* now wait for up to the maximum number of seconds allowed
2877            or until all nodes we expect a response from has replied
2878         */
2879         while (rmdata->count > 0) {
2880                 event_loop_once(ctdb->ev);
2881         }
2882
2883         status = rmdata->status;
2884         talloc_free(mem_ctx);
2885         return status;
2886 }
2887
2888 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2889                                     struct ctdb_recoverd *rec)
2890 {
2891         struct ctdb_control_get_ifaces *ifaces = NULL;
2892         TALLOC_CTX *mem_ctx;
2893         bool ret = false;
2894
2895         mem_ctx = talloc_new(NULL);
2896
2897         /* Read the interfaces from the local node */
2898         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2899                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2900                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2901                 /* We could return an error.  However, this will be
2902                  * rare so we'll decide that the interfaces have
2903                  * actually changed, just in case.
2904                  */
2905                 talloc_free(mem_ctx);
2906                 return true;
2907         }
2908
2909         if (!rec->ifaces) {
2910                 /* We haven't been here before so things have changed */
2911                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
2912                 ret = true;
2913         } else if (rec->ifaces->num != ifaces->num) {
2914                 /* Number of interfaces has changed */
2915                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
2916                                      rec->ifaces->num, ifaces->num));
2917                 ret = true;
2918         } else {
2919                 /* See if interface names or link states have changed */
2920                 int i;
2921                 for (i = 0; i < rec->ifaces->num; i++) {
2922                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
2923                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
2924                                 DEBUG(DEBUG_NOTICE,
2925                                       ("Interface in slot %d changed: %s => %s\n",
2926                                        i, iface->name, ifaces->ifaces[i].name));
2927                                 ret = true;
2928                                 break;
2929                         }
2930                         if (iface->link_state != ifaces->ifaces[i].link_state) {
2931                                 DEBUG(DEBUG_NOTICE,
2932                                       ("Interface %s changed state: %d => %d\n",
2933                                        iface->name, iface->link_state,
2934                                        ifaces->ifaces[i].link_state));
2935                                 ret = true;
2936                                 break;
2937                         }
2938                 }
2939         }
2940
2941         talloc_free(rec->ifaces);
2942         rec->ifaces = talloc_steal(rec, ifaces);
2943
2944         talloc_free(mem_ctx);
2945         return ret;
2946 }
2947
2948 /* called to check that the local allocation of public ip addresses is ok.
2949 */
2950 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2951 {
2952         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2953         struct ctdb_uptime *uptime1 = NULL;
2954         struct ctdb_uptime *uptime2 = NULL;
2955         int ret, j;
2956         bool need_takeover_run = false;
2957
2958         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2959                                 CTDB_CURRENT_NODE, &uptime1);
2960         if (ret != 0) {
2961                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2962                 talloc_free(mem_ctx);
2963                 return -1;
2964         }
2965
2966         if (interfaces_have_changed(ctdb, rec)) {
2967                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2968                                      "local node %u - force takeover run\n",
2969                                      pnn));
2970                 need_takeover_run = true;
2971         }
2972
2973         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2974                                 CTDB_CURRENT_NODE, &uptime2);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2977                 talloc_free(mem_ctx);
2978                 return -1;
2979         }
2980
2981         /* skip the check if the startrecovery time has changed */
2982         if (timeval_compare(&uptime1->last_recovery_started,
2983                             &uptime2->last_recovery_started) != 0) {
2984                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2985                 talloc_free(mem_ctx);
2986                 return 0;
2987         }
2988
2989         /* skip the check if the endrecovery time has changed */
2990         if (timeval_compare(&uptime1->last_recovery_finished,
2991                             &uptime2->last_recovery_finished) != 0) {
2992                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2993                 talloc_free(mem_ctx);
2994                 return 0;
2995         }
2996
2997         /* skip the check if we have started but not finished recovery */
2998         if (timeval_compare(&uptime1->last_recovery_finished,
2999                             &uptime1->last_recovery_started) != 1) {
3000                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3001                 talloc_free(mem_ctx);
3002
3003                 return 0;
3004         }
3005
3006         /* verify that we have the ip addresses we should have
3007            and we dont have ones we shouldnt have.
3008            if we find an inconsistency we set recmode to
3009            active on the local node and wait for the recmaster
3010            to do a full blown recovery.
3011            also if the pnn is -1 and we are healthy and can host the ip
3012            we also request a ip reallocation.
3013         */
3014         if (ctdb->tunable.disable_ip_failover == 0) {
3015                 struct ctdb_all_public_ips *ips = NULL;
3016
3017                 /* read the *available* IPs from the local node */
3018                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3019                 if (ret != 0) {
3020                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3021                         talloc_free(mem_ctx);
3022                         return -1;
3023                 }
3024
3025                 for (j=0; j<ips->num; j++) {
3026                         if (ips->ips[j].pnn == -1 &&
3027                             nodemap->nodes[pnn].flags == 0) {
3028                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3029                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3030                                 need_takeover_run = true;
3031                         }
3032                 }
3033
3034                 talloc_free(ips);
3035
3036                 /* read the *known* IPs from the local node */
3037                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3038                 if (ret != 0) {
3039                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3040                         talloc_free(mem_ctx);
3041                         return -1;
3042                 }
3043
3044                 for (j=0; j<ips->num; j++) {
3045                         if (ips->ips[j].pnn == pnn) {
3046                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3047                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3048                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3049                                         need_takeover_run = true;
3050                                 }
3051                         } else {
3052                                 if (ctdb->do_checkpublicip &&
3053                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3054
3055                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3056                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3057
3058                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3059                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3060                                         }
3061                                 }
3062                         }
3063                 }
3064         }
3065
3066         if (need_takeover_run) {
3067                 struct takeover_run_reply rd;
3068                 TDB_DATA data;
3069
3070                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3071
3072                 rd.pnn = ctdb->pnn;
3073                 rd.srvid = 0;
3074                 data.dptr = (uint8_t *)&rd;
3075                 data.dsize = sizeof(rd);
3076
3077                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3078                 if (ret != 0) {
3079                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3080                 }
3081         }
3082         talloc_free(mem_ctx);
3083         return 0;
3084 }
3085
3086
3087 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3088 {
3089         struct ctdb_node_map **remote_nodemaps = callback_data;
3090
3091         if (node_pnn >= ctdb->num_nodes) {
3092                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3093                 return;
3094         }
3095
3096         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3097
3098 }
3099
3100 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3101         struct ctdb_node_map *nodemap,
3102         struct ctdb_node_map **remote_nodemaps)
3103 {
3104         uint32_t *nodes;
3105
3106         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3107         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3108                                         nodes, 0,
3109                                         CONTROL_TIMEOUT(), false, tdb_null,
3110                                         async_getnodemap_callback,
3111                                         NULL,
3112                                         remote_nodemaps) != 0) {
3113                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3114
3115                 return -1;
3116         }
3117
3118         return 0;
3119 }
3120
3121 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3122 struct ctdb_check_reclock_state {
3123         struct ctdb_context *ctdb;
3124         struct timeval start_time;
3125         int fd[2];
3126         pid_t child;
3127         struct timed_event *te;
3128         struct fd_event *fde;
3129         enum reclock_child_status status;
3130 };
3131
3132 /* when we free the reclock state we must kill any child process.
3133 */
3134 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3135 {
3136         struct ctdb_context *ctdb = state->ctdb;
3137
3138         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3139
3140         if (state->fd[0] != -1) {
3141                 close(state->fd[0]);
3142                 state->fd[0] = -1;
3143         }
3144         if (state->fd[1] != -1) {
3145                 close(state->fd[1]);
3146                 state->fd[1] = -1;
3147         }
3148         ctdb_kill(ctdb, state->child, SIGKILL);
3149         return 0;
3150 }
3151
3152 /*
3153   called if our check_reclock child times out. this would happen if
3154   i/o to the reclock file blocks.
3155  */
3156 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3157                                          struct timeval t, void *private_data)
3158 {
3159         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3160                                            struct ctdb_check_reclock_state);
3161
3162         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3163         state->status = RECLOCK_TIMEOUT;
3164 }
3165
3166 /* this is called when the child process has completed checking the reclock
3167    file and has written data back to us through the pipe.
3168 */
3169 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3170                              uint16_t flags, void *private_data)
3171 {
3172         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3173                                              struct ctdb_check_reclock_state);
3174         char c = 0;
3175         int ret;
3176
3177         /* we got a response from our child process so we can abort the
3178            timeout.
3179         */
3180         talloc_free(state->te);
3181         state->te = NULL;
3182
3183         ret = read(state->fd[0], &c, 1);
3184         if (ret != 1 || c != RECLOCK_OK) {
3185                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3186                 state->status = RECLOCK_FAILED;
3187
3188                 return;
3189         }
3190
3191         state->status = RECLOCK_OK;
3192         return;
3193 }
3194
3195 static int check_recovery_lock(struct ctdb_context *ctdb)
3196 {
3197         int ret;
3198         struct ctdb_check_reclock_state *state;
3199         pid_t parent = getpid();
3200
3201         if (ctdb->recovery_lock_fd == -1) {
3202                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3203                 return -1;
3204         }
3205
3206         state = talloc(ctdb, struct ctdb_check_reclock_state);
3207         CTDB_NO_MEMORY(ctdb, state);
3208
3209         state->ctdb = ctdb;
3210         state->start_time = timeval_current();
3211         state->status = RECLOCK_CHECKING;
3212         state->fd[0] = -1;
3213         state->fd[1] = -1;
3214
3215         ret = pipe(state->fd);
3216         if (ret != 0) {
3217                 talloc_free(state);
3218                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3219                 return -1;
3220         }
3221
3222         state->child = ctdb_fork(ctdb);
3223         if (state->child == (pid_t)-1) {
3224                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3225                 close(state->fd[0]);
3226                 state->fd[0] = -1;
3227                 close(state->fd[1]);
3228                 state->fd[1] = -1;
3229                 talloc_free(state);
3230                 return -1;
3231         }
3232
3233         if (state->child == 0) {
3234                 char cc = RECLOCK_OK;
3235                 close(state->fd[0]);
3236                 state->fd[0] = -1;
3237
3238                 ctdb_set_process_name("ctdb_rec_reclock");
3239                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3240                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3241                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3242                         cc = RECLOCK_FAILED;
3243                 }
3244
3245                 write(state->fd[1], &cc, 1);
3246                 /* make sure we die when our parent dies */
3247                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3248                         sleep(5);
3249                 }
3250                 _exit(0);
3251         }
3252         close(state->fd[1]);
3253         state->fd[1] = -1;
3254         set_close_on_exec(state->fd[0]);
3255
3256         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3257
3258         talloc_set_destructor(state, check_reclock_destructor);
3259
3260         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3261                                     ctdb_check_reclock_timeout, state);
3262         if (state->te == NULL) {
3263                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3264                 talloc_free(state);
3265                 return -1;
3266         }
3267
3268         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3269                                 EVENT_FD_READ,
3270                                 reclock_child_handler,
3271                                 (void *)state);
3272
3273         if (state->fde == NULL) {
3274                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3275                 talloc_free(state);
3276                 return -1;
3277         }
3278         tevent_fd_set_auto_close(state->fde);
3279
3280         while (state->status == RECLOCK_CHECKING) {
3281                 event_loop_once(ctdb->ev);
3282         }
3283
3284         if (state->status == RECLOCK_FAILED) {
3285                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3286                 close(ctdb->recovery_lock_fd);
3287                 ctdb->recovery_lock_fd = -1;
3288                 talloc_free(state);
3289                 return -1;
3290         }
3291
3292         talloc_free(state);
3293         return 0;
3294 }
3295
3296 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3297 {
3298         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3299         const char *reclockfile;
3300
3301         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3302                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3303                 talloc_free(tmp_ctx);
3304                 return -1;      
3305         }
3306
3307         if (reclockfile == NULL) {
3308                 if (ctdb->recovery_lock_file != NULL) {
3309                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3310                         talloc_free(ctdb->recovery_lock_file);
3311                         ctdb->recovery_lock_file = NULL;
3312                         if (ctdb->recovery_lock_fd != -1) {
3313                                 close(ctdb->recovery_lock_fd);
3314                                 ctdb->recovery_lock_fd = -1;
3315                         }
3316                 }
3317                 ctdb->tunable.verify_recovery_lock = 0;
3318                 talloc_free(tmp_ctx);
3319                 return 0;
3320         }
3321
3322         if (ctdb->recovery_lock_file == NULL) {
3323                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3324                 if (ctdb->recovery_lock_fd != -1) {
3325                         close(ctdb->recovery_lock_fd);
3326                         ctdb->recovery_lock_fd = -1;
3327                 }
3328                 talloc_free(tmp_ctx);
3329                 return 0;
3330         }
3331
3332
3333         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3334                 talloc_free(tmp_ctx);
3335                 return 0;
3336         }
3337
3338         talloc_free(ctdb->recovery_lock_file);
3339         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3340         ctdb->tunable.verify_recovery_lock = 0;
3341         if (ctdb->recovery_lock_fd != -1) {
3342                 close(ctdb->recovery_lock_fd);
3343                 ctdb->recovery_lock_fd = -1;
3344         }
3345
3346         talloc_free(tmp_ctx);
3347         return 0;
3348 }
3349
3350 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3351                       TALLOC_CTX *mem_ctx)
3352 {
3353         uint32_t pnn;
3354         struct ctdb_node_map *nodemap=NULL;
3355         struct ctdb_node_map *recmaster_nodemap=NULL;
3356         struct ctdb_node_map **remote_nodemaps=NULL;
3357         struct ctdb_vnn_map *vnnmap=NULL;
3358         struct ctdb_vnn_map *remote_vnnmap=NULL;
3359         int32_t debug_level;
3360         int i, j, ret;
3361         bool self_ban;
3362
3363
3364         /* verify that the main daemon is still running */
3365         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3366                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3367                 exit(-1);
3368         }
3369
3370         /* ping the local daemon to tell it we are alive */
3371         ctdb_ctrl_recd_ping(ctdb);
3372
3373         if (rec->election_timeout) {
3374                 /* an election is in progress */
3375                 return;
3376         }
3377
3378         /* read the debug level from the parent and update locally */
3379         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3380         if (ret !=0) {
3381                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3382                 return;
3383         }
3384         LogLevel = debug_level;
3385
3386         /* get relevant tunables */
3387         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3388         if (ret != 0) {
3389                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3390                 return;
3391         }
3392
3393         /* get the current recovery lock file from the server */
3394         if (update_recovery_lock_file(ctdb) != 0) {
3395                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3396                 return;
3397         }
3398
3399         /* Make sure that if recovery lock verification becomes disabled when
3400            we close the file
3401         */
3402         if (ctdb->tunable.verify_recovery_lock == 0) {
3403                 if (ctdb->recovery_lock_fd != -1) {
3404                         close(ctdb->recovery_lock_fd);
3405                         ctdb->recovery_lock_fd = -1;
3406                 }
3407         }
3408
3409         pnn = ctdb_get_pnn(ctdb);
3410
3411         /* get the vnnmap */
3412         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3413         if (ret != 0) {
3414                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3415                 return;
3416         }
3417
3418
3419         /* get number of nodes */
3420         if (rec->nodemap) {
3421                 talloc_free(rec->nodemap);
3422                 rec->nodemap = NULL;
3423                 nodemap=NULL;
3424         }
3425         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3426         if (ret != 0) {
3427                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3428                 return;
3429         }
3430         nodemap = rec->nodemap;
3431
3432         /* remember our own node flags */
3433         rec->node_flags = nodemap->nodes[pnn].flags;
3434
3435         ban_misbehaving_nodes(rec, &self_ban);
3436         if (self_ban) {
3437                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3438                 return;
3439         }
3440
3441         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3442            also frozen and that the recmode is set to active.
3443         */
3444         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3445                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3446                 if (ret != 0) {
3447                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3448                 }
3449                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3450                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3451
3452                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3453                         if (ret != 0) {
3454                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3455                                 return;
3456                         }
3457                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3458                         if (ret != 0) {
3459                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3460
3461                                 return;
3462                         }
3463                 }
3464
3465                 /* If this node is stopped or banned then it is not the recovery
3466                  * master, so don't do anything. This prevents stopped or banned
3467                  * node from starting election and sending unnecessary controls.
3468                  */
3469                 return;
3470         }
3471
3472         /* check which node is the recovery master */
3473         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3474         if (ret != 0) {
3475                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3476                 return;
3477         }
3478
3479         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3480         if (rec->recmaster != pnn) {
3481                 if (rec->ip_reallocate_ctx != NULL) {
3482                         talloc_free(rec->ip_reallocate_ctx);
3483                         rec->ip_reallocate_ctx = NULL;
3484                         rec->reallocate_callers = NULL;
3485                 }
3486         }
3487
3488         /* This is a special case.  When recovery daemon is started, recmaster
3489          * is set to -1.  If a node is not started in stopped state, then
3490          * start election to decide recovery master
3491          */
3492         if (rec->recmaster == (uint32_t)-1) {
3493                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3494                 force_election(rec, pnn, nodemap);
3495                 return;
3496         }
3497
3498         /* update the capabilities for all nodes */
3499         ret = update_capabilities(ctdb, nodemap);
3500         if (ret != 0) {
3501                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3502                 return;
3503         }
3504
3505         /*
3506          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3507          * but we have, then force an election and try to become the new
3508          * recmaster.
3509          */
3510         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3511             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3512              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3513                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3514                                   " but we (node %u) have - force an election\n",
3515                                   rec->recmaster, pnn));
3516                 force_election(rec, pnn, nodemap);
3517                 return;
3518         }
3519
3520         /* count how many active nodes there are */
3521         rec->num_active    = 0;
3522         rec->num_connected = 0;
3523         for (i=0; i<nodemap->num; i++) {
3524                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3525                         rec->num_active++;
3526                 }
3527                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3528                         rec->num_connected++;
3529                 }
3530         }
3531
3532
3533         /* verify that the recmaster node is still active */
3534         for (j=0; j<nodemap->num; j++) {
3535                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3536                         break;
3537                 }
3538         }
3539
3540         if (j == nodemap->num) {
3541                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3542                 force_election(rec, pnn, nodemap);
3543                 return;
3544         }
3545
3546         /* if recovery master is disconnected we must elect a new recmaster */
3547         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3548                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3549                 force_election(rec, pnn, nodemap);
3550                 return;
3551         }
3552
3553         /* get nodemap from the recovery master to check if it is inactive */
3554         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3555                                    mem_ctx, &recmaster_nodemap);
3556         if (ret != 0) {
3557                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3558                           nodemap->nodes[j].pnn));
3559                 return;
3560         }
3561
3562
3563         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3564             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3565                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3566                 /*
3567                  * update our nodemap to carry the recmaster's notion of
3568                  * its own flags, so that we don't keep freezing the
3569                  * inactive recmaster node...
3570                  */
3571                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3572                 force_election(rec, pnn, nodemap);
3573                 return;
3574         }
3575
3576         /* verify that we have all ip addresses we should have and we dont
3577          * have addresses we shouldnt have.
3578          */ 
3579         if (ctdb->tunable.disable_ip_failover == 0) {
3580                 if (rec->ip_check_disable_ctx == NULL) {
3581                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3582                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3583                         }
3584                 }
3585         }
3586
3587
3588         /* if we are not the recmaster then we do not need to check
3589            if recovery is needed
3590          */
3591         if (pnn != rec->recmaster) {
3592                 return;
3593         }
3594
3595
3596         /* ensure our local copies of flags are right */
3597         ret = update_local_flags(rec, nodemap);
3598         if (ret == MONITOR_ELECTION_NEEDED) {
3599                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3600                 force_election(rec, pnn, nodemap);
3601                 return;
3602         }
3603         if (ret != MONITOR_OK) {
3604                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3605                 return;
3606         }
3607
3608         if (ctdb->num_nodes != nodemap->num) {
3609                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3610                 reload_nodes_file(ctdb);
3611                 return;
3612         }
3613
3614         /* verify that all active nodes agree that we are the recmaster */
3615         switch (verify_recmaster(rec, nodemap, pnn)) {
3616         case MONITOR_RECOVERY_NEEDED:
3617                 /* can not happen */
3618                 return;
3619         case MONITOR_ELECTION_NEEDED:
3620                 force_election(rec, pnn, nodemap);
3621                 return;
3622         case MONITOR_OK:
3623                 break;
3624         case MONITOR_FAILED:
3625                 return;
3626         }
3627
3628
3629         if (rec->need_recovery) {
3630                 /* a previous recovery didn't finish */
3631                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3632                 return;
3633         }
3634
3635         /* verify that all active nodes are in normal mode 
3636            and not in recovery mode 
3637         */
3638         switch (verify_recmode(ctdb, nodemap)) {
3639         case MONITOR_RECOVERY_NEEDED:
3640                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3641                 return;
3642         case MONITOR_FAILED:
3643                 return;
3644         case MONITOR_ELECTION_NEEDED:
3645                 /* can not happen */
3646         case MONITOR_OK:
3647                 break;
3648         }
3649
3650
3651         if (ctdb->tunable.verify_recovery_lock != 0) {
3652                 /* we should have the reclock - check its not stale */
3653                 ret = check_recovery_lock(ctdb);
3654                 if (ret != 0) {
3655                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3656                         ctdb_set_culprit(rec, ctdb->pnn);
3657                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3658                         return;
3659                 }
3660         }
3661
3662
3663         /* is there a pending reload all ips ? */
3664         if (reload_all_ips_request != NULL) {
3665                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3666                 talloc_free(reload_all_ips_request);
3667                 reload_all_ips_request = NULL;
3668         }
3669
3670         /* if there are takeovers requested, perform it and notify the waiters */
3671         if (rec->reallocate_callers) {
3672                 process_ipreallocate_requests(ctdb, rec);
3673         }
3674
3675         /* get the nodemap for all active remote nodes
3676          */
3677         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3678         if (remote_nodemaps == NULL) {
3679                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3680                 return;
3681         }
3682         for(i=0; i<nodemap->num; i++) {
3683                 remote_nodemaps[i] = NULL;
3684         }
3685         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3686                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3687                 return;
3688         } 
3689
3690         /* verify that all other nodes have the same nodemap as we have
3691         */
3692         for (j=0; j<nodemap->num; j++) {
3693                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3694                         continue;
3695                 }
3696
3697                 if (remote_nodemaps[j] == NULL) {
3698                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3699                         ctdb_set_culprit(rec, j);
3700
3701                         return;
3702                 }
3703
3704                 /* if the nodes disagree on how many nodes there are
3705                    then this is a good reason to try recovery
3706                  */
3707                 if (remote_nodemaps[j]->num != nodemap->num) {
3708                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3709                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3710                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3711                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3712                         return;
3713                 }
3714
3715                 /* if the nodes disagree on which nodes exist and are
3716                    active, then that is also a good reason to do recovery
3717                  */
3718                 for (i=0;i<nodemap->num;i++) {
3719                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3720                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3721                                           nodemap->nodes[j].pnn, i, 
3722                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3723                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3724                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3725                                             vnnmap);
3726                                 return;
3727                         }
3728                 }
3729         }
3730
3731         /*
3732          * Update node flags obtained from each active node. This ensure we have
3733          * up-to-date information for all the nodes.
3734          */
3735         for (j=0; j<nodemap->num; j++) {
3736                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3737                         continue;
3738                 }
3739                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3740         }
3741
3742         for (j=0; j<nodemap->num; j++) {
3743                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3744                         continue;
3745                 }
3746
3747                 /* verify the flags are consistent
3748                 */
3749                 for (i=0; i<nodemap->num; i++) {
3750                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3751                                 continue;
3752                         }
3753                         
3754                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3755                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3756                                   nodemap->nodes[j].pnn, 
3757                                   nodemap->nodes[i].pnn, 
3758                                   remote_nodemaps[j]->nodes[i].flags,
3759                                   nodemap->nodes[i].flags));
3760                                 if (i == j) {
3761                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3762                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3763                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3764                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3765                                                     vnnmap);
3766                                         return;
3767                                 } else {
3768                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3769                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3770                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3771                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3772                                                     vnnmap);
3773                                         return;
3774                                 }
3775                         }
3776                 }
3777         }
3778
3779
3780         /* there better be the same number of lmasters in the vnn map
3781            as there are active nodes or we will have to do a recovery
3782          */
3783         if (vnnmap->size != rec->num_active) {
3784                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3785                           vnnmap->size, rec->num_active));
3786                 ctdb_set_culprit(rec, ctdb->pnn);
3787                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3788                 return;
3789         }
3790
3791         /* verify that all active nodes in the nodemap also exist in 
3792            the vnnmap.
3793          */
3794         for (j=0; j<nodemap->num; j++) {
3795                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3796                         continue;
3797                 }
3798                 if (nodemap->nodes[j].pnn == pnn) {
3799                         continue;
3800                 }
3801
3802                 for (i=0; i<vnnmap->size; i++) {
3803                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3804                                 break;
3805                         }
3806                 }
3807                 if (i == vnnmap->size) {
3808                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3809                                   nodemap->nodes[j].pnn));
3810                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3811                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3812                         return;
3813                 }
3814         }
3815
3816         
3817         /* verify that all other nodes have the same vnnmap
3818            and are from the same generation
3819          */
3820         for (j=0; j<nodemap->num; j++) {
3821                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3822                         continue;
3823                 }
3824                 if (nodemap->nodes[j].pnn == pnn) {
3825                         continue;
3826                 }
3827
3828                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3829                                           mem_ctx, &remote_vnnmap);
3830                 if (ret != 0) {
3831                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3832                                   nodemap->nodes[j].pnn));
3833                         return;
3834                 }
3835
3836                 /* verify the vnnmap generation is the same */
3837                 if (vnnmap->generation != remote_vnnmap->generation) {
3838                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3839                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3840                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3841                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3842                         return;
3843                 }
3844
3845                 /* verify the vnnmap size is the same */
3846                 if (vnnmap->size != remote_vnnmap->size) {
3847                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3848                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3849                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3850                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3851                         return;
3852                 }
3853
3854                 /* verify the vnnmap is the same */
3855                 for (i=0;i<vnnmap->size;i++) {
3856                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3857                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3858                                           nodemap->nodes[j].pnn));
3859                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3860                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3861                                             vnnmap);
3862                                 return;
3863                         }
3864                 }
3865         }
3866
3867         /* we might need to change who has what IP assigned */
3868         if (rec->need_takeover_run) {
3869                 uint32_t culprit = (uint32_t)-1;
3870
3871                 rec->need_takeover_run = false;
3872
3873                 /* update the list of public ips that a node can handle for
3874                    all connected nodes
3875                 */
3876                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3877                 if (ret != 0) {
3878                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3879                                          culprit));
3880                         rec->need_takeover_run = true;
3881                         return;
3882                 }
3883
3884                 /* execute the "startrecovery" event script on all nodes */
3885                 ret = run_startrecovery_eventscript(rec, nodemap);
3886                 if (ret!=0) {
3887                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3888                         ctdb_set_culprit(rec, ctdb->pnn);
3889                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3890                         return;
3891                 }
3892
3893                 /* If takeover run fails, then the offending nodes are
3894                  * assigned ban culprit counts. And we re-try takeover.
3895                  * If takeover run fails repeatedly, the node would get
3896                  * banned.
3897                  *
3898                  * If rec->need_takeover_run is not set to true at this
3899                  * failure, monitoring is disabled cluster-wide (via
3900                  * startrecovery eventscript) and will not get enabled.
3901                  */
3902                 ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
3903                 if (ret != 0) {
3904                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
3905                         return;
3906                 }
3907
3908                 /* execute the "recovered" event script on all nodes */
3909                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3910 #if 0
3911 // we cant check whether the event completed successfully
3912 // since this script WILL fail if the node is in recovery mode
3913 // and if that race happens, the code here would just cause a second
3914 // cascading recovery.
3915                 if (ret!=0) {
3916                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3917                         ctdb_set_culprit(rec, ctdb->pnn);
3918                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3919                 }
3920 #endif
3921         }
3922 }
3923
3924 /*
3925   the main monitoring loop
3926  */
3927 static void monitor_cluster(struct ctdb_context *ctdb)
3928 {
3929         struct ctdb_recoverd *rec;
3930
3931         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3932
3933         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3934         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3935
3936         rec->ctdb = ctdb;
3937
3938         rec->priority_time = timeval_current();
3939
3940         /* register a message port for sending memory dumps */
3941         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3942
3943         /* register a message port for requesting logs */
3944         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3945
3946         /* register a message port for clearing logs */
3947         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3948
3949         /* register a message port for recovery elections */
3950         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3951
3952         /* when nodes are disabled/enabled */
3953         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3954
3955         /* when we are asked to puch out a flag change */
3956         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3957
3958         /* register a message port for vacuum fetch */
3959         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3960
3961         /* register a message port for reloadnodes  */
3962         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3963
3964         /* register a message port for performing a takeover run */
3965         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3966
3967         /* register a message port for performing a reload all ips */
3968         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3969
3970         /* register a message port for disabling the ip check for a short while */
3971         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3972
3973         /* register a message port for updating the recovery daemons node assignment for an ip */
3974         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3975
3976         /* register a message port for forcing a rebalance of a node next
3977            reallocation */
3978         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3979
3980         for (;;) {
3981                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3982                 struct timeval start;
3983                 double elapsed;
3984
3985                 if (!mem_ctx) {
3986                         DEBUG(DEBUG_CRIT,(__location__
3987                                           " Failed to create temp context\n"));
3988                         exit(-1);
3989                 }
3990
3991                 start = timeval_current();
3992                 main_loop(ctdb, rec, mem_ctx);
3993                 talloc_free(mem_ctx);
3994
3995                 /* we only check for recovery once every second */
3996                 elapsed = timeval_elapsed(&start);
3997                 if (elapsed < ctdb->tunable.recover_interval) {
3998                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3999                                           - elapsed);
4000                 }
4001         }
4002 }
4003
4004 /*
4005   event handler for when the main ctdbd dies
4006  */
4007 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4008                                  uint16_t flags, void *private_data)
4009 {
4010         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4011         _exit(1);
4012 }
4013
4014 /*
4015   called regularly to verify that the recovery daemon is still running
4016  */
4017 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4018                               struct timeval yt, void *p)
4019 {
4020         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4021
4022         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4023                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4024
4025                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4026                                 ctdb_restart_recd, ctdb);
4027
4028                 return;
4029         }
4030
4031         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4032                         timeval_current_ofs(30, 0),
4033                         ctdb_check_recd, ctdb);
4034 }
4035
4036 static void recd_sig_child_handler(struct event_context *ev,
4037         struct signal_event *se, int signum, int count,
4038         void *dont_care, 
4039         void *private_data)
4040 {
4041 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4042         int status;
4043         pid_t pid = -1;
4044
4045         while (pid != 0) {
4046                 pid = waitpid(-1, &status, WNOHANG);
4047                 if (pid == -1) {
4048                         if (errno != ECHILD) {
4049                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4050                         }
4051                         return;
4052                 }
4053                 if (pid > 0) {
4054                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4055                 }
4056         }
4057 }
4058
4059 /*
4060   startup the recovery daemon as a child of the main ctdb daemon
4061  */
4062 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4063 {
4064         int fd[2];
4065         struct signal_event *se;
4066         struct tevent_fd *fde;
4067
4068         if (pipe(fd) != 0) {
4069                 return -1;
4070         }
4071
4072         ctdb->ctdbd_pid = getpid();
4073
4074         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4075         if (ctdb->recoverd_pid == -1) {
4076                 return -1;
4077         }
4078
4079         if (ctdb->recoverd_pid != 0) {
4080                 talloc_free(ctdb->recd_ctx);
4081                 ctdb->recd_ctx = talloc_new(ctdb);
4082                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4083
4084                 close(fd[0]);
4085                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4086                                 timeval_current_ofs(30, 0),
4087                                 ctdb_check_recd, ctdb);
4088                 return 0;
4089         }
4090
4091         close(fd[1]);
4092
4093         srandom(getpid() ^ time(NULL));
4094
4095         /* Clear the log ringbuffer */
4096         ctdb_clear_log(ctdb);
4097
4098         ctdb_set_process_name("ctdb_recovered");
4099         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4100                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4101                 exit(1);
4102         }
4103
4104         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4105
4106         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4107                      ctdb_recoverd_parent, &fd[0]);
4108         tevent_fd_set_auto_close(fde);
4109
4110         /* set up a handler to pick up sigchld */
4111         se = event_add_signal(ctdb->ev, ctdb,
4112                                      SIGCHLD, 0,
4113                                      recd_sig_child_handler,
4114                                      ctdb);
4115         if (se == NULL) {
4116                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4117                 exit(1);
4118         }
4119
4120         monitor_cluster(ctdb);
4121
4122         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4123         return -1;
4124 }
4125
4126 /*
4127   shutdown the recovery daemon
4128  */
4129 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4130 {
4131         if (ctdb->recoverd_pid == 0) {
4132                 return;
4133         }
4134
4135         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4136         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4137
4138         TALLOC_FREE(ctdb->recd_ctx);
4139         TALLOC_FREE(ctdb->recd_ping_count);
4140 }
4141
4142 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4143                        struct timeval t, void *private_data)
4144 {
4145         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4146
4147         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4148         ctdb_stop_recoverd(ctdb);
4149         ctdb_start_recoverd(ctdb);
4150 }