recoverd: Factor out the SRVID handling code
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* List of SRVID requests that need to be processed */
39 struct srvid_list {
40         struct srvid_list *next, *prev;
41         struct srvid_request *request;
42 };
43
44 struct srvid_requests {
45         struct srvid_list *requests;
46 };
47
48 static void srvid_request_reply(struct ctdb_context *ctdb,
49                                 struct srvid_request *request,
50                                 TDB_DATA result)
51 {
52         /* Someone that sent srvid==0 does not want a reply */
53         if (request->srvid == 0) {
54                 talloc_free(request);
55                 return;
56         }
57
58         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
59                                      result) == 0) {
60                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
61                                   (unsigned)request->pnn,
62                                   (unsigned long long)request->srvid));
63         } else {
64                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
65                                  (unsigned)request->pnn,
66                                  (unsigned long long)request->srvid));
67         }
68
69         talloc_free(request);
70 }
71
72 static void srvid_requests_reply(struct ctdb_context *ctdb,
73                                  struct srvid_requests **requests,
74                                  TDB_DATA result)
75 {
76         struct srvid_list *r;
77
78         for (r = (*requests)->requests; r != NULL; r = r->next) {
79                 srvid_request_reply(ctdb, r->request, result);
80         }
81
82         /* Free the list structure... */
83         TALLOC_FREE(*requests);
84 }
85
86 static void srvid_request_add(struct ctdb_context *ctdb,
87                               struct srvid_requests **requests,
88                               struct srvid_request *request)
89 {
90         struct srvid_list *t;
91         int32_t ret;
92         TDB_DATA result;
93
94         if (*requests == NULL) {
95                 *requests = talloc_zero(ctdb, struct srvid_requests);
96                 if (*requests == NULL) {
97                         goto nomem;
98                 }
99         }
100
101         t = talloc_zero(*requests, struct srvid_list);
102         if (t == NULL) {
103                 /* If *requests was just allocated above then free it */
104                 if ((*requests)->requests == NULL) {
105                         TALLOC_FREE(*requests);
106                 }
107                 goto nomem;
108         }
109
110         t->request = (struct srvid_request *)talloc_steal(t, request);
111         DLIST_ADD((*requests)->requests, t);
112
113         return;
114
115 nomem:
116         /* Failed to add the request to the list.  Send a fail. */
117         DEBUG(DEBUG_ERR, (__location__
118                           " Out of memory, failed to queue SRVID request\n"));
119         ret = -ENOMEM;
120         result.dsize = sizeof(ret);
121         result.dptr = (uint8_t *)&ret;
122         srvid_request_reply(ctdb, request, result);
123 }
124
125 struct ctdb_banning_state {
126         uint32_t count;
127         struct timeval last_reported_time;
128 };
129
130 /*
131   private state of recovery daemon
132  */
133 struct ctdb_recoverd {
134         struct ctdb_context *ctdb;
135         uint32_t recmaster;
136         uint32_t num_active;
137         uint32_t num_connected;
138         uint32_t last_culprit_node;
139         struct ctdb_node_map *nodemap;
140         struct timeval priority_time;
141         bool need_takeover_run;
142         bool need_recovery;
143         uint32_t node_flags;
144         struct timed_event *send_election_te;
145         struct timed_event *election_timeout;
146         struct vacuum_info *vacuum_info;
147         struct srvid_requests *reallocate_requests;
148         bool takeover_run_in_progress;
149         TALLOC_CTX *ip_check_disable_ctx;
150         struct ctdb_control_get_ifaces *ifaces;
151         TALLOC_CTX *deferred_rebalance_ctx;
152 };
153
154 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
155 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
156
157 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
158
159 /*
160   ban a node for a period of time
161  */
162 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
163 {
164         int ret;
165         struct ctdb_context *ctdb = rec->ctdb;
166         struct ctdb_ban_time bantime;
167        
168         if (!ctdb_validate_pnn(ctdb, pnn)) {
169                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
170                 return;
171         }
172
173         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
174
175         bantime.pnn  = pnn;
176         bantime.time = ban_time;
177
178         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
179         if (ret != 0) {
180                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
181                 return;
182         }
183
184 }
185
186 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
187
188
189 /*
190   remember the trouble maker
191  */
192 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
193 {
194         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
195         struct ctdb_banning_state *ban_state;
196
197         if (culprit > ctdb->num_nodes) {
198                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
199                 return;
200         }
201
202         /* If we are banned or stopped, do not set other nodes as culprits */
203         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
204                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
205                 return;
206         }
207
208         if (ctdb->nodes[culprit]->ban_state == NULL) {
209                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
210                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
211
212                 
213         }
214         ban_state = ctdb->nodes[culprit]->ban_state;
215         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
216                 /* this was the first time in a long while this node
217                    misbehaved so we will forgive any old transgressions.
218                 */
219                 ban_state->count = 0;
220         }
221
222         ban_state->count += count;
223         ban_state->last_reported_time = timeval_current();
224         rec->last_culprit_node = culprit;
225 }
226
227 /*
228   remember the trouble maker
229  */
230 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
231 {
232         ctdb_set_culprit_count(rec, culprit, 1);
233 }
234
235
236 /* this callback is called for every node that failed to execute the
237    recovered event
238 */
239 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
240 {
241         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
242
243         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
244
245         ctdb_set_culprit(rec, node_pnn);
246 }
247
248 /*
249   run the "recovered" eventscript on all nodes
250  */
251 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
252 {
253         TALLOC_CTX *tmp_ctx;
254         uint32_t *nodes;
255         struct ctdb_context *ctdb = rec->ctdb;
256
257         tmp_ctx = talloc_new(ctdb);
258         CTDB_NO_MEMORY(ctdb, tmp_ctx);
259
260         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
261         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
262                                         nodes, 0,
263                                         CONTROL_TIMEOUT(), false, tdb_null,
264                                         NULL, recovered_fail_callback,
265                                         rec) != 0) {
266                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
267
268                 talloc_free(tmp_ctx);
269                 return -1;
270         }
271
272         talloc_free(tmp_ctx);
273         return 0;
274 }
275
276 /* this callback is called for every node that failed to execute the
277    start recovery event
278 */
279 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
280 {
281         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
282
283         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
284
285         ctdb_set_culprit(rec, node_pnn);
286 }
287
288 /*
289   run the "startrecovery" eventscript on all nodes
290  */
291 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
292 {
293         TALLOC_CTX *tmp_ctx;
294         uint32_t *nodes;
295         struct ctdb_context *ctdb = rec->ctdb;
296
297         tmp_ctx = talloc_new(ctdb);
298         CTDB_NO_MEMORY(ctdb, tmp_ctx);
299
300         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(), false, tdb_null,
304                                         NULL,
305                                         startrecovery_fail_callback,
306                                         rec) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
317 {
318         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
319                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
320                 return;
321         }
322         if (node_pnn < ctdb->num_nodes) {
323                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
324         }
325
326         if (node_pnn == ctdb->pnn) {
327                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
328         }
329 }
330
331 /*
332   update the node capabilities for all connected nodes
333  */
334 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
335 {
336         uint32_t *nodes;
337         TALLOC_CTX *tmp_ctx;
338
339         tmp_ctx = talloc_new(ctdb);
340         CTDB_NO_MEMORY(ctdb, tmp_ctx);
341
342         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
343         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
344                                         nodes, 0,
345                                         CONTROL_TIMEOUT(),
346                                         false, tdb_null,
347                                         async_getcap_callback, NULL,
348                                         NULL) != 0) {
349                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
350                 talloc_free(tmp_ctx);
351                 return -1;
352         }
353
354         talloc_free(tmp_ctx);
355         return 0;
356 }
357
358 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
359 {
360         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
361
362         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
363         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
364 }
365
366 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
367 {
368         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
369
370         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
371         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
372 }
373
374 /*
375   change recovery mode on all nodes
376  */
377 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
378 {
379         TDB_DATA data;
380         uint32_t *nodes;
381         TALLOC_CTX *tmp_ctx;
382
383         tmp_ctx = talloc_new(ctdb);
384         CTDB_NO_MEMORY(ctdb, tmp_ctx);
385
386         /* freeze all nodes */
387         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
388         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
389                 int i;
390
391                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
392                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
393                                                 nodes, i,
394                                                 CONTROL_TIMEOUT(),
395                                                 false, tdb_null,
396                                                 NULL,
397                                                 set_recmode_fail_callback,
398                                                 rec) != 0) {
399                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
400                                 talloc_free(tmp_ctx);
401                                 return -1;
402                         }
403                 }
404         }
405
406
407         data.dsize = sizeof(uint32_t);
408         data.dptr = (unsigned char *)&rec_mode;
409
410         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
411                                         nodes, 0,
412                                         CONTROL_TIMEOUT(),
413                                         false, data,
414                                         NULL, NULL,
415                                         NULL) != 0) {
416                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
417                 talloc_free(tmp_ctx);
418                 return -1;
419         }
420
421         talloc_free(tmp_ctx);
422         return 0;
423 }
424
425 /*
426   change recovery master on all node
427  */
428 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
429 {
430         TDB_DATA data;
431         TALLOC_CTX *tmp_ctx;
432         uint32_t *nodes;
433
434         tmp_ctx = talloc_new(ctdb);
435         CTDB_NO_MEMORY(ctdb, tmp_ctx);
436
437         data.dsize = sizeof(uint32_t);
438         data.dptr = (unsigned char *)&pnn;
439
440         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
441         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
442                                         nodes, 0,
443                                         CONTROL_TIMEOUT(), false, data,
444                                         NULL, NULL,
445                                         NULL) != 0) {
446                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
447                 talloc_free(tmp_ctx);
448                 return -1;
449         }
450
451         talloc_free(tmp_ctx);
452         return 0;
453 }
454
455 /* update all remote nodes to use the same db priority that we have
456    this can fail if the remove node has not yet been upgraded to 
457    support this function, so we always return success and never fail
458    a recovery if this call fails.
459 */
460 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
461         struct ctdb_node_map *nodemap, 
462         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
463 {
464         int db;
465         uint32_t *nodes;
466
467         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
468
469         /* step through all local databases */
470         for (db=0; db<dbmap->num;db++) {
471                 TDB_DATA data;
472                 struct ctdb_db_priority db_prio;
473                 int ret;
474
475                 db_prio.db_id     = dbmap->dbs[db].dbid;
476                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
477                 if (ret != 0) {
478                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
479                         continue;
480                 }
481
482                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
483
484                 data.dptr  = (uint8_t *)&db_prio;
485                 data.dsize = sizeof(db_prio);
486
487                 if (ctdb_client_async_control(ctdb,
488                                         CTDB_CONTROL_SET_DB_PRIORITY,
489                                         nodes, 0,
490                                         CONTROL_TIMEOUT(), false, data,
491                                         NULL, NULL,
492                                         NULL) != 0) {
493                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
494                 }
495         }
496
497         return 0;
498 }                       
499
500 /*
501   ensure all other nodes have attached to any databases that we have
502  */
503 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
504                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
505 {
506         int i, j, db, ret;
507         struct ctdb_dbid_map *remote_dbmap;
508
509         /* verify that all other nodes have all our databases */
510         for (j=0; j<nodemap->num; j++) {
511                 /* we dont need to ourself ourselves */
512                 if (nodemap->nodes[j].pnn == pnn) {
513                         continue;
514                 }
515                 /* dont check nodes that are unavailable */
516                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
517                         continue;
518                 }
519
520                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
521                                          mem_ctx, &remote_dbmap);
522                 if (ret != 0) {
523                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
524                         return -1;
525                 }
526
527                 /* step through all local databases */
528                 for (db=0; db<dbmap->num;db++) {
529                         const char *name;
530
531
532                         for (i=0;i<remote_dbmap->num;i++) {
533                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
534                                         break;
535                                 }
536                         }
537                         /* the remote node already have this database */
538                         if (i!=remote_dbmap->num) {
539                                 continue;
540                         }
541                         /* ok so we need to create this database */
542                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
543                                             mem_ctx, &name);
544                         if (ret != 0) {
545                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
546                                 return -1;
547                         }
548                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
549                                            mem_ctx, name,
550                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
551                         if (ret != 0) {
552                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
553                                 return -1;
554                         }
555                 }
556         }
557
558         return 0;
559 }
560
561
562 /*
563   ensure we are attached to any databases that anyone else is attached to
564  */
565 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
566                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
567 {
568         int i, j, db, ret;
569         struct ctdb_dbid_map *remote_dbmap;
570
571         /* verify that we have all database any other node has */
572         for (j=0; j<nodemap->num; j++) {
573                 /* we dont need to ourself ourselves */
574                 if (nodemap->nodes[j].pnn == pnn) {
575                         continue;
576                 }
577                 /* dont check nodes that are unavailable */
578                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
579                         continue;
580                 }
581
582                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
583                                          mem_ctx, &remote_dbmap);
584                 if (ret != 0) {
585                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
586                         return -1;
587                 }
588
589                 /* step through all databases on the remote node */
590                 for (db=0; db<remote_dbmap->num;db++) {
591                         const char *name;
592
593                         for (i=0;i<(*dbmap)->num;i++) {
594                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
595                                         break;
596                                 }
597                         }
598                         /* we already have this db locally */
599                         if (i!=(*dbmap)->num) {
600                                 continue;
601                         }
602                         /* ok so we need to create this database and
603                            rebuild dbmap
604                          */
605                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
606                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
607                         if (ret != 0) {
608                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
609                                           nodemap->nodes[j].pnn));
610                                 return -1;
611                         }
612                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
613                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
614                         if (ret != 0) {
615                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
616                                 return -1;
617                         }
618                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
619                         if (ret != 0) {
620                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
621                                 return -1;
622                         }
623                 }
624         }
625
626         return 0;
627 }
628
629
630 /*
631   pull the remote database contents from one node into the recdb
632  */
633 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
634                                     struct tdb_wrap *recdb, uint32_t dbid)
635 {
636         int ret;
637         TDB_DATA outdata;
638         struct ctdb_marshall_buffer *reply;
639         struct ctdb_rec_data *rec;
640         int i;
641         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
642
643         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
644                                CONTROL_TIMEOUT(), &outdata);
645         if (ret != 0) {
646                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
647                 talloc_free(tmp_ctx);
648                 return -1;
649         }
650
651         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
652
653         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
654                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
655                 talloc_free(tmp_ctx);
656                 return -1;
657         }
658         
659         rec = (struct ctdb_rec_data *)&reply->data[0];
660         
661         for (i=0;
662              i<reply->count;
663              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
664                 TDB_DATA key, data;
665                 struct ctdb_ltdb_header *hdr;
666                 TDB_DATA existing;
667                 
668                 key.dptr = &rec->data[0];
669                 key.dsize = rec->keylen;
670                 data.dptr = &rec->data[key.dsize];
671                 data.dsize = rec->datalen;
672                 
673                 hdr = (struct ctdb_ltdb_header *)data.dptr;
674
675                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
676                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
677                         talloc_free(tmp_ctx);
678                         return -1;
679                 }
680
681                 /* fetch the existing record, if any */
682                 existing = tdb_fetch(recdb->tdb, key);
683                 
684                 if (existing.dptr != NULL) {
685                         struct ctdb_ltdb_header header;
686                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
687                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
688                                          (unsigned)existing.dsize, srcnode));
689                                 free(existing.dptr);
690                                 talloc_free(tmp_ctx);
691                                 return -1;
692                         }
693                         header = *(struct ctdb_ltdb_header *)existing.dptr;
694                         free(existing.dptr);
695                         if (!(header.rsn < hdr->rsn ||
696                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
697                                 continue;
698                         }
699                 }
700                 
701                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
702                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
703                         talloc_free(tmp_ctx);
704                         return -1;                              
705                 }
706         }
707
708         talloc_free(tmp_ctx);
709
710         return 0;
711 }
712
713
714 struct pull_seqnum_cbdata {
715         int failed;
716         uint32_t pnn;
717         uint64_t seqnum;
718 };
719
720 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
721 {
722         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
723         uint64_t seqnum;
724
725         if (cb_data->failed != 0) {
726                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
727                 return;
728         }
729
730         if (res != 0) {
731                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
732                 cb_data->failed = 1;
733                 return;
734         }
735
736         if (outdata.dsize != sizeof(uint64_t)) {
737                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
738                 cb_data->failed = -1;
739                 return;
740         }
741
742         seqnum = *((uint64_t *)outdata.dptr);
743
744         if (seqnum > cb_data->seqnum) {
745                 cb_data->seqnum = seqnum;
746                 cb_data->pnn = node_pnn;
747         }
748 }
749
750 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
751 {
752         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
753
754         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
755         cb_data->failed = 1;
756 }
757
758 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
759                                 struct ctdb_recoverd *rec, 
760                                 struct ctdb_node_map *nodemap, 
761                                 struct tdb_wrap *recdb, uint32_t dbid)
762 {
763         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
764         uint32_t *nodes;
765         TDB_DATA data;
766         uint32_t outdata[2];
767         struct pull_seqnum_cbdata *cb_data;
768
769         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
770
771         outdata[0] = dbid;
772         outdata[1] = 0;
773
774         data.dsize = sizeof(outdata);
775         data.dptr  = (uint8_t *)&outdata[0];
776
777         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
778         if (cb_data == NULL) {
779                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
780                 talloc_free(tmp_ctx);
781                 return -1;
782         }
783
784         cb_data->failed = 0;
785         cb_data->pnn    = -1;
786         cb_data->seqnum = 0;
787         
788         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
789         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
790                                         nodes, 0,
791                                         CONTROL_TIMEOUT(), false, data,
792                                         pull_seqnum_cb,
793                                         pull_seqnum_fail_cb,
794                                         cb_data) != 0) {
795                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
796
797                 talloc_free(tmp_ctx);
798                 return -1;
799         }
800
801         if (cb_data->failed != 0) {
802                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
803                 talloc_free(tmp_ctx);
804                 return -1;
805         }
806
807         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
808                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
809                 talloc_free(tmp_ctx);
810                 return -1;
811         }
812
813         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
814
815         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
816                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
817                 talloc_free(tmp_ctx);
818                 return -1;
819         }
820
821         talloc_free(tmp_ctx);
822         return 0;
823 }
824
825
826 /*
827   pull all the remote database contents into the recdb
828  */
829 static int pull_remote_database(struct ctdb_context *ctdb,
830                                 struct ctdb_recoverd *rec, 
831                                 struct ctdb_node_map *nodemap, 
832                                 struct tdb_wrap *recdb, uint32_t dbid,
833                                 bool persistent)
834 {
835         int j;
836
837         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
838                 int ret;
839                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
840                 if (ret == 0) {
841                         return 0;
842                 }
843         }
844
845         /* pull all records from all other nodes across onto this node
846            (this merges based on rsn)
847         */
848         for (j=0; j<nodemap->num; j++) {
849                 /* dont merge from nodes that are unavailable */
850                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
851                         continue;
852                 }
853                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
854                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
855                                  nodemap->nodes[j].pnn));
856                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
857                         return -1;
858                 }
859         }
860         
861         return 0;
862 }
863
864
865 /*
866   update flags on all active nodes
867  */
868 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
869 {
870         int ret;
871
872         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
873                 if (ret != 0) {
874                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
875                 return -1;
876         }
877
878         return 0;
879 }
880
881 /*
882   ensure all nodes have the same vnnmap we do
883  */
884 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
885                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
886 {
887         int j, ret;
888
889         /* push the new vnn map out to all the nodes */
890         for (j=0; j<nodemap->num; j++) {
891                 /* dont push to nodes that are unavailable */
892                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
893                         continue;
894                 }
895
896                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
897                 if (ret != 0) {
898                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
899                         return -1;
900                 }
901         }
902
903         return 0;
904 }
905
906
907 struct vacuum_info {
908         struct vacuum_info *next, *prev;
909         struct ctdb_recoverd *rec;
910         uint32_t srcnode;
911         struct ctdb_db_context *ctdb_db;
912         struct ctdb_marshall_buffer *recs;
913         struct ctdb_rec_data *r;
914 };
915
916 static void vacuum_fetch_next(struct vacuum_info *v);
917
918 /*
919   called when a vacuum fetch has completed - just free it and do the next one
920  */
921 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
922 {
923         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
924         talloc_free(state);
925         vacuum_fetch_next(v);
926 }
927
928
929 /*
930   process the next element from the vacuum list
931 */
932 static void vacuum_fetch_next(struct vacuum_info *v)
933 {
934         struct ctdb_call call;
935         struct ctdb_rec_data *r;
936
937         while (v->recs->count) {
938                 struct ctdb_client_call_state *state;
939                 TDB_DATA data;
940                 struct ctdb_ltdb_header *hdr;
941
942                 ZERO_STRUCT(call);
943                 call.call_id = CTDB_NULL_FUNC;
944                 call.flags = CTDB_IMMEDIATE_MIGRATION;
945                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
946
947                 r = v->r;
948                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
949                 v->recs->count--;
950
951                 call.key.dptr = &r->data[0];
952                 call.key.dsize = r->keylen;
953
954                 /* ensure we don't block this daemon - just skip a record if we can't get
955                    the chainlock */
956                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
957                         continue;
958                 }
959
960                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
961                 if (data.dptr == NULL) {
962                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
963                         continue;
964                 }
965
966                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
967                         free(data.dptr);
968                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
969                         continue;
970                 }
971                 
972                 hdr = (struct ctdb_ltdb_header *)data.dptr;
973                 if (hdr->dmaster == v->rec->ctdb->pnn) {
974                         /* its already local */
975                         free(data.dptr);
976                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
977                         continue;
978                 }
979
980                 free(data.dptr);
981
982                 state = ctdb_call_send(v->ctdb_db, &call);
983                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
984                 if (state == NULL) {
985                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
986                         talloc_free(v);
987                         return;
988                 }
989                 state->async.fn = vacuum_fetch_callback;
990                 state->async.private_data = v;
991                 return;
992         }
993
994         talloc_free(v);
995 }
996
997
998 /*
999   destroy a vacuum info structure
1000  */
1001 static int vacuum_info_destructor(struct vacuum_info *v)
1002 {
1003         DLIST_REMOVE(v->rec->vacuum_info, v);
1004         return 0;
1005 }
1006
1007
1008 /*
1009   handler for vacuum fetch
1010 */
1011 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1012                                  TDB_DATA data, void *private_data)
1013 {
1014         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1015         struct ctdb_marshall_buffer *recs;
1016         int ret, i;
1017         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1018         const char *name;
1019         struct ctdb_dbid_map *dbmap=NULL;
1020         bool persistent = false;
1021         struct ctdb_db_context *ctdb_db;
1022         struct ctdb_rec_data *r;
1023         uint32_t srcnode;
1024         struct vacuum_info *v;
1025
1026         recs = (struct ctdb_marshall_buffer *)data.dptr;
1027         r = (struct ctdb_rec_data *)&recs->data[0];
1028
1029         if (recs->count == 0) {
1030                 talloc_free(tmp_ctx);
1031                 return;
1032         }
1033
1034         srcnode = r->reqid;
1035
1036         for (v=rec->vacuum_info;v;v=v->next) {
1037                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1038                         /* we're already working on records from this node */
1039                         talloc_free(tmp_ctx);
1040                         return;
1041                 }
1042         }
1043
1044         /* work out if the database is persistent */
1045         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1046         if (ret != 0) {
1047                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1048                 talloc_free(tmp_ctx);
1049                 return;
1050         }
1051
1052         for (i=0;i<dbmap->num;i++) {
1053                 if (dbmap->dbs[i].dbid == recs->db_id) {
1054                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1055                         break;
1056                 }
1057         }
1058         if (i == dbmap->num) {
1059                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1060                 talloc_free(tmp_ctx);
1061                 return;         
1062         }
1063
1064         /* find the name of this database */
1065         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1066                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1067                 talloc_free(tmp_ctx);
1068                 return;
1069         }
1070
1071         /* attach to it */
1072         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1073         if (ctdb_db == NULL) {
1074                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1075                 talloc_free(tmp_ctx);
1076                 return;
1077         }
1078
1079         v = talloc_zero(rec, struct vacuum_info);
1080         if (v == NULL) {
1081                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1082                 talloc_free(tmp_ctx);
1083                 return;
1084         }
1085
1086         v->rec = rec;
1087         v->srcnode = srcnode;
1088         v->ctdb_db = ctdb_db;
1089         v->recs = talloc_memdup(v, recs, data.dsize);
1090         if (v->recs == NULL) {
1091                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1092                 talloc_free(v);
1093                 talloc_free(tmp_ctx);
1094                 return;         
1095         }
1096         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1097
1098         DLIST_ADD(rec->vacuum_info, v);
1099
1100         talloc_set_destructor(v, vacuum_info_destructor);
1101
1102         vacuum_fetch_next(v);
1103         talloc_free(tmp_ctx);
1104 }
1105
1106
1107 /*
1108   called when ctdb_wait_timeout should finish
1109  */
1110 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1111                               struct timeval yt, void *p)
1112 {
1113         uint32_t *timed_out = (uint32_t *)p;
1114         (*timed_out) = 1;
1115 }
1116
1117 /*
1118   wait for a given number of seconds
1119  */
1120 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1121 {
1122         uint32_t timed_out = 0;
1123         time_t usecs = (secs - (time_t)secs) * 1000000;
1124         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1125         while (!timed_out) {
1126                 event_loop_once(ctdb->ev);
1127         }
1128 }
1129
1130 /*
1131   called when an election times out (ends)
1132  */
1133 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1134                                   struct timeval t, void *p)
1135 {
1136         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1137         rec->election_timeout = NULL;
1138         fast_start = false;
1139
1140         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1141 }
1142
1143
1144 /*
1145   wait for an election to finish. It finished election_timeout seconds after
1146   the last election packet is received
1147  */
1148 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1149 {
1150         struct ctdb_context *ctdb = rec->ctdb;
1151         while (rec->election_timeout) {
1152                 event_loop_once(ctdb->ev);
1153         }
1154 }
1155
1156 /*
1157   Update our local flags from all remote connected nodes. 
1158   This is only run when we are or we belive we are the recovery master
1159  */
1160 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1161 {
1162         int j;
1163         struct ctdb_context *ctdb = rec->ctdb;
1164         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1165
1166         /* get the nodemap for all active remote nodes and verify
1167            they are the same as for this node
1168          */
1169         for (j=0; j<nodemap->num; j++) {
1170                 struct ctdb_node_map *remote_nodemap=NULL;
1171                 int ret;
1172
1173                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1174                         continue;
1175                 }
1176                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1177                         continue;
1178                 }
1179
1180                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1181                                            mem_ctx, &remote_nodemap);
1182                 if (ret != 0) {
1183                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1184                                   nodemap->nodes[j].pnn));
1185                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1186                         talloc_free(mem_ctx);
1187                         return MONITOR_FAILED;
1188                 }
1189                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1190                         /* We should tell our daemon about this so it
1191                            updates its flags or else we will log the same 
1192                            message again in the next iteration of recovery.
1193                            Since we are the recovery master we can just as
1194                            well update the flags on all nodes.
1195                         */
1196                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1197                         if (ret != 0) {
1198                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1199                                 return -1;
1200                         }
1201
1202                         /* Update our local copy of the flags in the recovery
1203                            daemon.
1204                         */
1205                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1206                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1207                                  nodemap->nodes[j].flags));
1208                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1209                 }
1210                 talloc_free(remote_nodemap);
1211         }
1212         talloc_free(mem_ctx);
1213         return MONITOR_OK;
1214 }
1215
1216
1217 /* Create a new random generation ip. 
1218    The generation id can not be the INVALID_GENERATION id
1219 */
1220 static uint32_t new_generation(void)
1221 {
1222         uint32_t generation;
1223
1224         while (1) {
1225                 generation = random();
1226
1227                 if (generation != INVALID_GENERATION) {
1228                         break;
1229                 }
1230         }
1231
1232         return generation;
1233 }
1234
1235
1236 /*
1237   create a temporary working database
1238  */
1239 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1240 {
1241         char *name;
1242         struct tdb_wrap *recdb;
1243         unsigned tdb_flags;
1244
1245         /* open up the temporary recovery database */
1246         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1247                                ctdb->db_directory_state,
1248                                ctdb->pnn);
1249         if (name == NULL) {
1250                 return NULL;
1251         }
1252         unlink(name);
1253
1254         tdb_flags = TDB_NOLOCK;
1255         if (ctdb->valgrinding) {
1256                 tdb_flags |= TDB_NOMMAP;
1257         }
1258         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1259
1260         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1261                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1262         if (recdb == NULL) {
1263                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1264         }
1265
1266         talloc_free(name);
1267
1268         return recdb;
1269 }
1270
1271
1272 /* 
1273    a traverse function for pulling all relevant records from recdb
1274  */
1275 struct recdb_data {
1276         struct ctdb_context *ctdb;
1277         struct ctdb_marshall_buffer *recdata;
1278         uint32_t len;
1279         uint32_t allocated_len;
1280         bool failed;
1281         bool persistent;
1282 };
1283
1284 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1285 {
1286         struct recdb_data *params = (struct recdb_data *)p;
1287         struct ctdb_rec_data *rec;
1288         struct ctdb_ltdb_header *hdr;
1289
1290         /*
1291          * skip empty records - but NOT for persistent databases:
1292          *
1293          * The record-by-record mode of recovery deletes empty records.
1294          * For persistent databases, this can lead to data corruption
1295          * by deleting records that should be there:
1296          *
1297          * - Assume the cluster has been running for a while.
1298          *
1299          * - A record R in a persistent database has been created and
1300          *   deleted a couple of times, the last operation being deletion,
1301          *   leaving an empty record with a high RSN, say 10.
1302          *
1303          * - Now a node N is turned off.
1304          *
1305          * - This leaves the local database copy of D on N with the empty
1306          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1307          *   the copy of record R.
1308          *
1309          * - Now the record is created again while node N is turned off.
1310          *   This creates R with RSN = 1 on all nodes except for N.
1311          *
1312          * - Now node N is turned on again. The following recovery will chose
1313          *   the older empty copy of R due to RSN 10 > RSN 1.
1314          *
1315          * ==> Hence the record is gone after the recovery.
1316          *
1317          * On databases like Samba's registry, this can damage the higher-level
1318          * data structures built from the various tdb-level records.
1319          */
1320         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1321                 return 0;
1322         }
1323
1324         /* update the dmaster field to point to us */
1325         hdr = (struct ctdb_ltdb_header *)data.dptr;
1326         if (!params->persistent) {
1327                 hdr->dmaster = params->ctdb->pnn;
1328                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1329         }
1330
1331         /* add the record to the blob ready to send to the nodes */
1332         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1333         if (rec == NULL) {
1334                 params->failed = true;
1335                 return -1;
1336         }
1337         if (params->len + rec->length >= params->allocated_len) {
1338                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1339                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1340         }
1341         if (params->recdata == NULL) {
1342                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1343                          rec->length + params->len));
1344                 params->failed = true;
1345                 return -1;
1346         }
1347         params->recdata->count++;
1348         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1349         params->len += rec->length;
1350         talloc_free(rec);
1351
1352         return 0;
1353 }
1354
1355 /*
1356   push the recdb database out to all nodes
1357  */
1358 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1359                                bool persistent,
1360                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1361 {
1362         struct recdb_data params;
1363         struct ctdb_marshall_buffer *recdata;
1364         TDB_DATA outdata;
1365         TALLOC_CTX *tmp_ctx;
1366         uint32_t *nodes;
1367
1368         tmp_ctx = talloc_new(ctdb);
1369         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1370
1371         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1372         CTDB_NO_MEMORY(ctdb, recdata);
1373
1374         recdata->db_id = dbid;
1375
1376         params.ctdb = ctdb;
1377         params.recdata = recdata;
1378         params.len = offsetof(struct ctdb_marshall_buffer, data);
1379         params.allocated_len = params.len;
1380         params.failed = false;
1381         params.persistent = persistent;
1382
1383         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1384                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1385                 talloc_free(params.recdata);
1386                 talloc_free(tmp_ctx);
1387                 return -1;
1388         }
1389
1390         if (params.failed) {
1391                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1392                 talloc_free(params.recdata);
1393                 talloc_free(tmp_ctx);
1394                 return -1;              
1395         }
1396
1397         recdata = params.recdata;
1398
1399         outdata.dptr = (void *)recdata;
1400         outdata.dsize = params.len;
1401
1402         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1403         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1404                                         nodes, 0,
1405                                         CONTROL_TIMEOUT(), false, outdata,
1406                                         NULL, NULL,
1407                                         NULL) != 0) {
1408                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1409                 talloc_free(recdata);
1410                 talloc_free(tmp_ctx);
1411                 return -1;
1412         }
1413
1414         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1415                   dbid, recdata->count));
1416
1417         talloc_free(recdata);
1418         talloc_free(tmp_ctx);
1419
1420         return 0;
1421 }
1422
1423
1424 /*
1425   go through a full recovery on one database 
1426  */
1427 static int recover_database(struct ctdb_recoverd *rec, 
1428                             TALLOC_CTX *mem_ctx,
1429                             uint32_t dbid,
1430                             bool persistent,
1431                             uint32_t pnn, 
1432                             struct ctdb_node_map *nodemap,
1433                             uint32_t transaction_id)
1434 {
1435         struct tdb_wrap *recdb;
1436         int ret;
1437         struct ctdb_context *ctdb = rec->ctdb;
1438         TDB_DATA data;
1439         struct ctdb_control_wipe_database w;
1440         uint32_t *nodes;
1441
1442         recdb = create_recdb(ctdb, mem_ctx);
1443         if (recdb == NULL) {
1444                 return -1;
1445         }
1446
1447         /* pull all remote databases onto the recdb */
1448         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1449         if (ret != 0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1451                 return -1;
1452         }
1453
1454         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1455
1456         /* wipe all the remote databases. This is safe as we are in a transaction */
1457         w.db_id = dbid;
1458         w.transaction_id = transaction_id;
1459
1460         data.dptr = (void *)&w;
1461         data.dsize = sizeof(w);
1462
1463         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1464         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1465                                         nodes, 0,
1466                                         CONTROL_TIMEOUT(), false, data,
1467                                         NULL, NULL,
1468                                         NULL) != 0) {
1469                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1470                 talloc_free(recdb);
1471                 return -1;
1472         }
1473         
1474         /* push out the correct database. This sets the dmaster and skips 
1475            the empty records */
1476         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1477         if (ret != 0) {
1478                 talloc_free(recdb);
1479                 return -1;
1480         }
1481
1482         /* all done with this database */
1483         talloc_free(recdb);
1484
1485         return 0;
1486 }
1487
1488 /*
1489   reload the nodes file 
1490 */
1491 static void reload_nodes_file(struct ctdb_context *ctdb)
1492 {
1493         ctdb->nodes = NULL;
1494         ctdb_load_nodes_file(ctdb);
1495 }
1496
1497 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1498                                          struct ctdb_recoverd *rec,
1499                                          struct ctdb_node_map *nodemap,
1500                                          uint32_t *culprit)
1501 {
1502         int j;
1503         int ret;
1504
1505         if (ctdb->num_nodes != nodemap->num) {
1506                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1507                                   ctdb->num_nodes, nodemap->num));
1508                 if (culprit) {
1509                         *culprit = ctdb->pnn;
1510                 }
1511                 return -1;
1512         }
1513
1514         for (j=0; j<nodemap->num; j++) {
1515                 /* For readability */
1516                 struct ctdb_node *node = ctdb->nodes[j];
1517
1518                 /* release any existing data */
1519                 if (node->known_public_ips) {
1520                         talloc_free(node->known_public_ips);
1521                         node->known_public_ips = NULL;
1522                 }
1523                 if (node->available_public_ips) {
1524                         talloc_free(node->available_public_ips);
1525                         node->available_public_ips = NULL;
1526                 }
1527
1528                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1529                         continue;
1530                 }
1531
1532                 /* Retrieve the list of known public IPs from the node */
1533                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1534                                         CONTROL_TIMEOUT(),
1535                                         node->pnn,
1536                                         ctdb->nodes,
1537                                         0,
1538                                         &node->known_public_ips);
1539                 if (ret != 0) {
1540                         DEBUG(DEBUG_ERR,
1541                               ("Failed to read known public IPs from node: %u\n",
1542                                node->pnn));
1543                         if (culprit) {
1544                                 *culprit = node->pnn;
1545                         }
1546                         return -1;
1547                 }
1548
1549                 if (ctdb->do_checkpublicip &&
1550                     (rec->ip_check_disable_ctx == NULL) &&
1551                     verify_remote_ip_allocation(ctdb,
1552                                                  node->known_public_ips,
1553                                                  node->pnn)) {
1554                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1555                         rec->need_takeover_run = true;
1556                 }
1557
1558                 /* Retrieve the list of available public IPs from the node */
1559                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1560                                         CONTROL_TIMEOUT(),
1561                                         node->pnn,
1562                                         ctdb->nodes,
1563                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1564                                         &node->available_public_ips);
1565                 if (ret != 0) {
1566                         DEBUG(DEBUG_ERR,
1567                               ("Failed to read available public IPs from node: %u\n",
1568                                node->pnn));
1569                         if (culprit) {
1570                                 *culprit = node->pnn;
1571                         }
1572                         return -1;
1573                 }
1574         }
1575
1576         return 0;
1577 }
1578
1579 /* when we start a recovery, make sure all nodes use the same reclock file
1580    setting
1581 */
1582 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1583 {
1584         struct ctdb_context *ctdb = rec->ctdb;
1585         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1586         TDB_DATA data;
1587         uint32_t *nodes;
1588
1589         if (ctdb->recovery_lock_file == NULL) {
1590                 data.dptr  = NULL;
1591                 data.dsize = 0;
1592         } else {
1593                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1594                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1595         }
1596
1597         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1598         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1599                                         nodes, 0,
1600                                         CONTROL_TIMEOUT(),
1601                                         false, data,
1602                                         NULL, NULL,
1603                                         rec) != 0) {
1604                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1605                 talloc_free(tmp_ctx);
1606                 return -1;
1607         }
1608
1609         talloc_free(tmp_ctx);
1610         return 0;
1611 }
1612
1613
1614 /*
1615  * this callback is called for every node that failed to execute ctdb_takeover_run()
1616  * and set flag to re-run takeover run.
1617  */
1618 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1619 {
1620         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1621
1622         if (callback_data != NULL) {
1623                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1624
1625                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1626
1627                 ctdb_set_culprit(rec, node_pnn);
1628         }
1629 }
1630
1631
1632 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1633 {
1634         struct ctdb_context *ctdb = rec->ctdb;
1635         int i;
1636         struct ctdb_banning_state *ban_state;
1637
1638         *self_ban = false;
1639         for (i=0; i<ctdb->num_nodes; i++) {
1640                 if (ctdb->nodes[i]->ban_state == NULL) {
1641                         continue;
1642                 }
1643                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1644                 if (ban_state->count < 2*ctdb->num_nodes) {
1645                         continue;
1646                 }
1647
1648                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1649                         ctdb->nodes[i]->pnn, ban_state->count,
1650                         ctdb->tunable.recovery_ban_period));
1651                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1652                 ban_state->count = 0;
1653
1654                 /* Banning ourself? */
1655                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1656                         *self_ban = true;
1657                 }
1658         }
1659 }
1660
1661 static bool do_takeover_run(struct ctdb_recoverd *rec,
1662                             struct ctdb_node_map *nodemap,
1663                             bool banning_credits_on_fail)
1664 {
1665         uint32_t disable_timeout;
1666         TDB_DATA data;
1667         int ret;
1668         bool ok;
1669
1670         if (rec->takeover_run_in_progress) {
1671                 DEBUG(DEBUG_ERR, (__location__
1672                                   " takeover run already in progress \n"));
1673                 ok = false;
1674                 goto done;
1675         }
1676
1677         /* Disable IP checks while doing this takeover run.  This will
1678          * stop those other nodes from triggering takeover runs when
1679          * think they should be hosting an IP but it isn't yet on an
1680          * interface.
1681          */
1682         data.dptr  = (uint8_t*)&disable_timeout;
1683         data.dsize = sizeof(disable_timeout);
1684
1685         disable_timeout = rec->ctdb->tunable.takeover_timeout;
1686         if (ctdb_client_send_message(rec->ctdb, CTDB_BROADCAST_CONNECTED,
1687                                      CTDB_SRVID_DISABLE_IP_CHECK,
1688                                      data) != 0) {
1689                 DEBUG(DEBUG_INFO,("Failed to disable IP check\n"));
1690         }
1691
1692         rec->takeover_run_in_progress = true;
1693
1694         ret = ctdb_takeover_run(rec->ctdb, nodemap, takeover_fail_callback,
1695                                 banning_credits_on_fail ? rec : NULL);
1696
1697         /* Reenable IP checks */
1698         disable_timeout = 0;
1699         if (ctdb_client_send_message(rec->ctdb, CTDB_BROADCAST_CONNECTED,
1700                                      CTDB_SRVID_DISABLE_IP_CHECK,
1701                                      data) != 0) {
1702                 DEBUG(DEBUG_INFO,("Failed to reenable IP check\n"));
1703         }
1704
1705         if (ret != 0) {
1706                 DEBUG(DEBUG_ERR, ("IP reallocation failed\n"));
1707                 ok = false;
1708                 goto done;
1709         }
1710
1711         ok = true;
1712 done:
1713         rec->need_takeover_run = !ok;
1714         rec->takeover_run_in_progress = false;
1715         return ok;
1716 }
1717
1718
1719 /*
1720   we are the recmaster, and recovery is needed - start a recovery run
1721  */
1722 static int do_recovery(struct ctdb_recoverd *rec, 
1723                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1724                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1725 {
1726         struct ctdb_context *ctdb = rec->ctdb;
1727         int i, j, ret;
1728         uint32_t generation;
1729         struct ctdb_dbid_map *dbmap;
1730         TDB_DATA data;
1731         uint32_t *nodes;
1732         struct timeval start_time;
1733         uint32_t culprit = (uint32_t)-1;
1734         bool self_ban;
1735
1736         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1737
1738         /* if recovery fails, force it again */
1739         rec->need_recovery = true;
1740
1741         ban_misbehaving_nodes(rec, &self_ban);
1742         if (self_ban) {
1743                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1744                 return -1;
1745         }
1746
1747         if (ctdb->tunable.verify_recovery_lock != 0) {
1748                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1749                 start_time = timeval_current();
1750                 if (!ctdb_recovery_lock(ctdb, true)) {
1751                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1752                                          "and ban ourself for %u seconds\n",
1753                                          ctdb->tunable.recovery_ban_period));
1754                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1755                         return -1;
1756                 }
1757                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1758                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1759         }
1760
1761         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1762
1763         /* get a list of all databases */
1764         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1765         if (ret != 0) {
1766                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1767                 return -1;
1768         }
1769
1770         /* we do the db creation before we set the recovery mode, so the freeze happens
1771            on all databases we will be dealing with. */
1772
1773         /* verify that we have all the databases any other node has */
1774         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1775         if (ret != 0) {
1776                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1777                 return -1;
1778         }
1779
1780         /* verify that all other nodes have all our databases */
1781         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1782         if (ret != 0) {
1783                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1784                 return -1;
1785         }
1786         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1787
1788         /* update the database priority for all remote databases */
1789         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1790         if (ret != 0) {
1791                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1792         }
1793         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1794
1795
1796         /* update all other nodes to use the same setting for reclock files
1797            as the local recovery master.
1798         */
1799         sync_recovery_lock_file_across_cluster(rec);
1800
1801         /* set recovery mode to active on all nodes */
1802         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1803         if (ret != 0) {
1804                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1805                 return -1;
1806         }
1807
1808         /* execute the "startrecovery" event script on all nodes */
1809         ret = run_startrecovery_eventscript(rec, nodemap);
1810         if (ret!=0) {
1811                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1812                 return -1;
1813         }
1814
1815         /*
1816           update all nodes to have the same flags that we have
1817          */
1818         for (i=0;i<nodemap->num;i++) {
1819                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1820                         continue;
1821                 }
1822
1823                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1824                 if (ret != 0) {
1825                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1826                         return -1;
1827                 }
1828         }
1829
1830         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1831
1832         /* pick a new generation number */
1833         generation = new_generation();
1834
1835         /* change the vnnmap on this node to use the new generation 
1836            number but not on any other nodes.
1837            this guarantees that if we abort the recovery prematurely
1838            for some reason (a node stops responding?)
1839            that we can just return immediately and we will reenter
1840            recovery shortly again.
1841            I.e. we deliberately leave the cluster with an inconsistent
1842            generation id to allow us to abort recovery at any stage and
1843            just restart it from scratch.
1844          */
1845         vnnmap->generation = generation;
1846         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1847         if (ret != 0) {
1848                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1849                 return -1;
1850         }
1851
1852         data.dptr = (void *)&generation;
1853         data.dsize = sizeof(uint32_t);
1854
1855         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1856         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1857                                         nodes, 0,
1858                                         CONTROL_TIMEOUT(), false, data,
1859                                         NULL,
1860                                         transaction_start_fail_callback,
1861                                         rec) != 0) {
1862                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1863                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1864                                         nodes, 0,
1865                                         CONTROL_TIMEOUT(), false, tdb_null,
1866                                         NULL,
1867                                         NULL,
1868                                         NULL) != 0) {
1869                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1870                 }
1871                 return -1;
1872         }
1873
1874         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1875
1876         for (i=0;i<dbmap->num;i++) {
1877                 ret = recover_database(rec, mem_ctx,
1878                                        dbmap->dbs[i].dbid,
1879                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1880                                        pnn, nodemap, generation);
1881                 if (ret != 0) {
1882                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1883                         return -1;
1884                 }
1885         }
1886
1887         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1888
1889         /* commit all the changes */
1890         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1891                                         nodes, 0,
1892                                         CONTROL_TIMEOUT(), false, data,
1893                                         NULL, NULL,
1894                                         NULL) != 0) {
1895                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1896                 return -1;
1897         }
1898
1899         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1900         
1901
1902         /* update the capabilities for all nodes */
1903         ret = update_capabilities(ctdb, nodemap);
1904         if (ret!=0) {
1905                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1906                 return -1;
1907         }
1908
1909         /* build a new vnn map with all the currently active and
1910            unbanned nodes */
1911         generation = new_generation();
1912         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1913         CTDB_NO_MEMORY(ctdb, vnnmap);
1914         vnnmap->generation = generation;
1915         vnnmap->size = 0;
1916         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1917         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1918         for (i=j=0;i<nodemap->num;i++) {
1919                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1920                         continue;
1921                 }
1922                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1923                         /* this node can not be an lmaster */
1924                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1925                         continue;
1926                 }
1927
1928                 vnnmap->size++;
1929                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1930                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1931                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1932
1933         }
1934         if (vnnmap->size == 0) {
1935                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1936                 vnnmap->size++;
1937                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1938                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1939                 vnnmap->map[0] = pnn;
1940         }       
1941
1942         /* update to the new vnnmap on all nodes */
1943         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1944         if (ret != 0) {
1945                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1946                 return -1;
1947         }
1948
1949         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1950
1951         /* update recmaster to point to us for all nodes */
1952         ret = set_recovery_master(ctdb, nodemap, pnn);
1953         if (ret!=0) {
1954                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1955                 return -1;
1956         }
1957
1958         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1959
1960         /*
1961           update all nodes to have the same flags that we have
1962          */
1963         for (i=0;i<nodemap->num;i++) {
1964                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1965                         continue;
1966                 }
1967
1968                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1969                 if (ret != 0) {
1970                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1971                         return -1;
1972                 }
1973         }
1974
1975         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1976
1977         /* disable recovery mode */
1978         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1979         if (ret != 0) {
1980                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1981                 return -1;
1982         }
1983
1984         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1985
1986         /* Fetch known/available public IPs from each active node */
1987         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1988         if (ret != 0) {
1989                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1990                                  culprit));
1991                 rec->need_takeover_run = true;
1992                 return -1;
1993         }
1994
1995         do_takeover_run(rec, nodemap, false);
1996
1997         /* execute the "recovered" event script on all nodes */
1998         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1999         if (ret!=0) {
2000                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2001                 return -1;
2002         }
2003
2004         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2005
2006         /* send a message to all clients telling them that the cluster 
2007            has been reconfigured */
2008         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
2009
2010         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2011
2012         rec->need_recovery = false;
2013
2014         /* we managed to complete a full recovery, make sure to forgive
2015            any past sins by the nodes that could now participate in the
2016            recovery.
2017         */
2018         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2019         for (i=0;i<nodemap->num;i++) {
2020                 struct ctdb_banning_state *ban_state;
2021
2022                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2023                         continue;
2024                 }
2025
2026                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2027                 if (ban_state == NULL) {
2028                         continue;
2029                 }
2030
2031                 ban_state->count = 0;
2032         }
2033
2034
2035         /* We just finished a recovery successfully. 
2036            We now wait for rerecovery_timeout before we allow 
2037            another recovery to take place.
2038         */
2039         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2040         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2041         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2042
2043         return 0;
2044 }
2045
2046
2047 /*
2048   elections are won by first checking the number of connected nodes, then
2049   the priority time, then the pnn
2050  */
2051 struct election_message {
2052         uint32_t num_connected;
2053         struct timeval priority_time;
2054         uint32_t pnn;
2055         uint32_t node_flags;
2056 };
2057
2058 /*
2059   form this nodes election data
2060  */
2061 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2062 {
2063         int ret, i;
2064         struct ctdb_node_map *nodemap;
2065         struct ctdb_context *ctdb = rec->ctdb;
2066
2067         ZERO_STRUCTP(em);
2068
2069         em->pnn = rec->ctdb->pnn;
2070         em->priority_time = rec->priority_time;
2071
2072         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2073         if (ret != 0) {
2074                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
2075                 return;
2076         }
2077
2078         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2079         em->node_flags = rec->node_flags;
2080
2081         for (i=0;i<nodemap->num;i++) {
2082                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2083                         em->num_connected++;
2084                 }
2085         }
2086
2087         /* we shouldnt try to win this election if we cant be a recmaster */
2088         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2089                 em->num_connected = 0;
2090                 em->priority_time = timeval_current();
2091         }
2092
2093         talloc_free(nodemap);
2094 }
2095
2096 /*
2097   see if the given election data wins
2098  */
2099 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2100 {
2101         struct election_message myem;
2102         int cmp = 0;
2103
2104         ctdb_election_data(rec, &myem);
2105
2106         /* we cant win if we dont have the recmaster capability */
2107         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2108                 return false;
2109         }
2110
2111         /* we cant win if we are banned */
2112         if (rec->node_flags & NODE_FLAGS_BANNED) {
2113                 return false;
2114         }
2115
2116         /* we cant win if we are stopped */
2117         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2118                 return false;
2119         }
2120
2121         /* we will automatically win if the other node is banned */
2122         if (em->node_flags & NODE_FLAGS_BANNED) {
2123                 return true;
2124         }
2125
2126         /* we will automatically win if the other node is banned */
2127         if (em->node_flags & NODE_FLAGS_STOPPED) {
2128                 return true;
2129         }
2130
2131         /* try to use the most connected node */
2132         if (cmp == 0) {
2133                 cmp = (int)myem.num_connected - (int)em->num_connected;
2134         }
2135
2136         /* then the longest running node */
2137         if (cmp == 0) {
2138                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2139         }
2140
2141         if (cmp == 0) {
2142                 cmp = (int)myem.pnn - (int)em->pnn;
2143         }
2144
2145         return cmp > 0;
2146 }
2147
2148 /*
2149   send out an election request
2150  */
2151 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2152 {
2153         int ret;
2154         TDB_DATA election_data;
2155         struct election_message emsg;
2156         uint64_t srvid;
2157         struct ctdb_context *ctdb = rec->ctdb;
2158
2159         srvid = CTDB_SRVID_RECOVERY;
2160
2161         ctdb_election_data(rec, &emsg);
2162
2163         election_data.dsize = sizeof(struct election_message);
2164         election_data.dptr  = (unsigned char *)&emsg;
2165
2166
2167         /* send an election message to all active nodes */
2168         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2169         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2170
2171
2172         /* A new node that is already frozen has entered the cluster.
2173            The existing nodes are not frozen and dont need to be frozen
2174            until the election has ended and we start the actual recovery
2175         */
2176         if (update_recmaster == true) {
2177                 /* first we assume we will win the election and set 
2178                    recoverymaster to be ourself on the current node
2179                  */
2180                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2181                 if (ret != 0) {
2182                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2183                         return -1;
2184                 }
2185         }
2186
2187
2188         return 0;
2189 }
2190
2191 /*
2192   this function will unban all nodes in the cluster
2193 */
2194 static void unban_all_nodes(struct ctdb_context *ctdb)
2195 {
2196         int ret, i;
2197         struct ctdb_node_map *nodemap;
2198         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2199         
2200         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2201         if (ret != 0) {
2202                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2203                 return;
2204         }
2205
2206         for (i=0;i<nodemap->num;i++) {
2207                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2208                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2209                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2210                 }
2211         }
2212
2213         talloc_free(tmp_ctx);
2214 }
2215
2216
2217 /*
2218   we think we are winning the election - send a broadcast election request
2219  */
2220 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2221 {
2222         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2223         int ret;
2224
2225         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2226         if (ret != 0) {
2227                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2228         }
2229
2230         talloc_free(rec->send_election_te);
2231         rec->send_election_te = NULL;
2232 }
2233
2234 /*
2235   handler for memory dumps
2236 */
2237 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2238                              TDB_DATA data, void *private_data)
2239 {
2240         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2241         TDB_DATA *dump;
2242         int ret;
2243         struct srvid_request *rd;
2244
2245         if (data.dsize != sizeof(struct srvid_request)) {
2246                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2247                 talloc_free(tmp_ctx);
2248                 return;
2249         }
2250         rd = (struct srvid_request *)data.dptr;
2251
2252         dump = talloc_zero(tmp_ctx, TDB_DATA);
2253         if (dump == NULL) {
2254                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2255                 talloc_free(tmp_ctx);
2256                 return;
2257         }
2258         ret = ctdb_dump_memory(ctdb, dump);
2259         if (ret != 0) {
2260                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2261                 talloc_free(tmp_ctx);
2262                 return;
2263         }
2264
2265 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2266
2267         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2268         if (ret != 0) {
2269                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2270                 talloc_free(tmp_ctx);
2271                 return;
2272         }
2273
2274         talloc_free(tmp_ctx);
2275 }
2276
2277 /*
2278   handler for getlog
2279 */
2280 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2281                            TDB_DATA data, void *private_data)
2282 {
2283         struct ctdb_get_log_addr *log_addr;
2284         pid_t child;
2285
2286         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2287                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2288                 return;
2289         }
2290         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2291
2292         child = ctdb_fork_no_free_ringbuffer(ctdb);
2293         if (child == (pid_t)-1) {
2294                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2295                 return;
2296         }
2297
2298         if (child == 0) {
2299                 ctdb_set_process_name("ctdb_rec_log_collector");
2300                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2301                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2302                         _exit(1);
2303                 }
2304                 ctdb_collect_log(ctdb, log_addr);
2305                 _exit(0);
2306         }
2307 }
2308
2309 /*
2310   handler for clearlog
2311 */
2312 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2313                              TDB_DATA data, void *private_data)
2314 {
2315         ctdb_clear_log(ctdb);
2316 }
2317
2318 /*
2319   handler for reload_nodes
2320 */
2321 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2322                              TDB_DATA data, void *private_data)
2323 {
2324         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2325
2326         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2327
2328         reload_nodes_file(rec->ctdb);
2329 }
2330
2331
2332 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2333                               struct timeval yt, void *p)
2334 {
2335         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2336
2337         talloc_free(rec->ip_check_disable_ctx);
2338         rec->ip_check_disable_ctx = NULL;
2339 }
2340
2341
2342 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2343                                   struct timeval t, void *p)
2344 {
2345         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2346
2347         DEBUG(DEBUG_NOTICE,
2348               ("Rebalance all nodes that have had ip assignment changes.\n"));
2349
2350         do_takeover_run(rec, rec->nodemap, false);
2351
2352         talloc_free(rec->deferred_rebalance_ctx);
2353         rec->deferred_rebalance_ctx = NULL;
2354 }
2355
2356         
2357 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2358                              TDB_DATA data, void *private_data)
2359 {
2360         uint32_t pnn;
2361         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2362
2363         if (data.dsize != sizeof(uint32_t)) {
2364                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2365                 return;
2366         }
2367
2368         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2369                 return;
2370         }
2371
2372         pnn = *(uint32_t *)&data.dptr[0];
2373
2374         lcp2_forcerebalance(ctdb, pnn);
2375         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2376
2377         if (rec->deferred_rebalance_ctx != NULL) {
2378                 talloc_free(rec->deferred_rebalance_ctx);
2379         }
2380         rec->deferred_rebalance_ctx = talloc_new(rec);
2381         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2382                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2383                         ctdb_rebalance_timeout, rec);
2384 }
2385
2386
2387
2388 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2389                              TDB_DATA data, void *private_data)
2390 {
2391         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2392         struct ctdb_public_ip *ip;
2393
2394         if (rec->recmaster != rec->ctdb->pnn) {
2395                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2396                 return;
2397         }
2398
2399         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2400                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2401                 return;
2402         }
2403
2404         ip = (struct ctdb_public_ip *)data.dptr;
2405
2406         update_ip_assignment_tree(rec->ctdb, ip);
2407 }
2408
2409
2410 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2411                              TDB_DATA data, void *private_data)
2412 {
2413         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2414         uint32_t timeout;
2415
2416         if (rec->ip_check_disable_ctx != NULL) {
2417                 talloc_free(rec->ip_check_disable_ctx);
2418                 rec->ip_check_disable_ctx = NULL;
2419         }
2420
2421         if (data.dsize != sizeof(uint32_t)) {
2422                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2423                                  "expexting %lu\n", (long unsigned)data.dsize,
2424                                  (long unsigned)sizeof(uint32_t)));
2425                 return;
2426         }
2427         if (data.dptr == NULL) {
2428                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2429                 return;
2430         }
2431
2432         timeout = *((uint32_t *)data.dptr);
2433
2434         if (timeout == 0) {
2435                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2436                 return;
2437         }
2438                 
2439         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2440
2441         rec->ip_check_disable_ctx = talloc_new(rec);
2442         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2443
2444         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2445 }
2446
2447
2448 /*
2449   handler for reload all ips.
2450 */
2451 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2452                              TDB_DATA data, void *private_data)
2453 {
2454         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2455
2456         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2457                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2458                 return;
2459         }
2460
2461         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2462
2463         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2464         return;
2465 }
2466
2467 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2468 {
2469         uint32_t *status = callback_data;
2470
2471         if (res != 0) {
2472                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2473                 *status = 1;
2474         }
2475 }
2476
2477 static int
2478 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2479 {
2480         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2481         uint32_t *nodes;
2482         uint32_t status;
2483         int i;
2484
2485         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2486         for (i = 0; i< nodemap->num; i++) {
2487                 if (nodemap->nodes[i].flags != 0) {
2488                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2489                         talloc_free(tmp_ctx);
2490                         return -1;
2491                 }
2492         }
2493
2494         /* send the flags update to all connected nodes */
2495         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2496         status = 0;
2497         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2498                                         nodes, 0,
2499                                         CONTROL_TIMEOUT(),
2500                                         false, tdb_null,
2501                                         async_reloadips_callback, NULL,
2502                                         &status) != 0) {
2503                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2504                 talloc_free(tmp_ctx);
2505                 return -1;
2506         }
2507
2508         if (status != 0) {
2509                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2510                 talloc_free(tmp_ctx);
2511                 return -1;
2512         }
2513
2514         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2515
2516         talloc_free(tmp_ctx);
2517         return 0;
2518 }
2519
2520
2521 /*
2522   handler for ip reallocate, just add it to the list of requests and 
2523   handle this later in the monitor_cluster loop so we do not recurse
2524   with other requests to takeover_run()
2525 */
2526 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2527                                   TDB_DATA data, void *private_data)
2528 {
2529         struct srvid_request *request;
2530         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2531                                                     struct ctdb_recoverd);
2532
2533         if (data.dsize != sizeof(struct srvid_request)) {
2534                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2535                 return;
2536         }
2537
2538         request = (struct srvid_request *)data.dptr;
2539
2540         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2541 }
2542
2543 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2544                                           struct ctdb_recoverd *rec)
2545 {
2546         TDB_DATA result;
2547         int32_t ret;
2548         uint32_t culprit;
2549
2550         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2551
2552         /* update the list of public ips that a node can handle for
2553            all connected nodes
2554         */
2555         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2556         if (ret != 0) {
2557                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2558                                  culprit));
2559                 rec->need_takeover_run = true;
2560         }
2561         if (ret == 0) {
2562                 if (do_takeover_run(rec, rec->nodemap, false)) {
2563                         ret = ctdb_get_pnn(ctdb);
2564                 } else {
2565                         ret = -1;
2566                 }
2567         }
2568
2569         result.dsize = sizeof(int32_t);
2570         result.dptr  = (uint8_t *)&ret;
2571
2572         srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
2573 }
2574
2575
2576 /*
2577   handler for recovery master elections
2578 */
2579 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2580                              TDB_DATA data, void *private_data)
2581 {
2582         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2583         int ret;
2584         struct election_message *em = (struct election_message *)data.dptr;
2585         TALLOC_CTX *mem_ctx;
2586
2587         /* we got an election packet - update the timeout for the election */
2588         talloc_free(rec->election_timeout);
2589         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2590                                                 fast_start ?
2591                                                 timeval_current_ofs(0, 500000) :
2592                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2593                                                 ctdb_election_timeout, rec);
2594
2595         mem_ctx = talloc_new(ctdb);
2596
2597         /* someone called an election. check their election data
2598            and if we disagree and we would rather be the elected node, 
2599            send a new election message to all other nodes
2600          */
2601         if (ctdb_election_win(rec, em)) {
2602                 if (!rec->send_election_te) {
2603                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2604                                                                 timeval_current_ofs(0, 500000),
2605                                                                 election_send_request, rec);
2606                 }
2607                 talloc_free(mem_ctx);
2608                 /*unban_all_nodes(ctdb);*/
2609                 return;
2610         }
2611         
2612         /* we didn't win */
2613         talloc_free(rec->send_election_te);
2614         rec->send_election_te = NULL;
2615
2616         if (ctdb->tunable.verify_recovery_lock != 0) {
2617                 /* release the recmaster lock */
2618                 if (em->pnn != ctdb->pnn &&
2619                     ctdb->recovery_lock_fd != -1) {
2620                         close(ctdb->recovery_lock_fd);
2621                         ctdb->recovery_lock_fd = -1;
2622                         unban_all_nodes(ctdb);
2623                 }
2624         }
2625
2626         /* ok, let that guy become recmaster then */
2627         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2628         if (ret != 0) {
2629                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2630                 talloc_free(mem_ctx);
2631                 return;
2632         }
2633
2634         talloc_free(mem_ctx);
2635         return;
2636 }
2637
2638
2639 /*
2640   force the start of the election process
2641  */
2642 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2643                            struct ctdb_node_map *nodemap)
2644 {
2645         int ret;
2646         struct ctdb_context *ctdb = rec->ctdb;
2647
2648         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2649
2650         /* set all nodes to recovery mode to stop all internode traffic */
2651         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2652         if (ret != 0) {
2653                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2654                 return;
2655         }
2656
2657         talloc_free(rec->election_timeout);
2658         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2659                                                 fast_start ?
2660                                                 timeval_current_ofs(0, 500000) :
2661                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2662                                                 ctdb_election_timeout, rec);
2663
2664         ret = send_election_request(rec, pnn, true);
2665         if (ret!=0) {
2666                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2667                 return;
2668         }
2669
2670         /* wait for a few seconds to collect all responses */
2671         ctdb_wait_election(rec);
2672 }
2673
2674
2675
2676 /*
2677   handler for when a node changes its flags
2678 */
2679 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2680                             TDB_DATA data, void *private_data)
2681 {
2682         int ret;
2683         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2684         struct ctdb_node_map *nodemap=NULL;
2685         TALLOC_CTX *tmp_ctx;
2686         int i;
2687         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2688         int disabled_flag_changed;
2689
2690         if (data.dsize != sizeof(*c)) {
2691                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2692                 return;
2693         }
2694
2695         tmp_ctx = talloc_new(ctdb);
2696         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2697
2698         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2699         if (ret != 0) {
2700                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2701                 talloc_free(tmp_ctx);
2702                 return;         
2703         }
2704
2705
2706         for (i=0;i<nodemap->num;i++) {
2707                 if (nodemap->nodes[i].pnn == c->pnn) break;
2708         }
2709
2710         if (i == nodemap->num) {
2711                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2712                 talloc_free(tmp_ctx);
2713                 return;
2714         }
2715
2716         if (c->old_flags != c->new_flags) {
2717                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2718         }
2719
2720         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2721
2722         nodemap->nodes[i].flags = c->new_flags;
2723
2724         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2725                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2726
2727         if (ret == 0) {
2728                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2729                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2730         }
2731         
2732         if (ret == 0 &&
2733             ctdb->recovery_master == ctdb->pnn &&
2734             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2735                 /* Only do the takeover run if the perm disabled or unhealthy
2736                    flags changed since these will cause an ip failover but not
2737                    a recovery.
2738                    If the node became disconnected or banned this will also
2739                    lead to an ip address failover but that is handled 
2740                    during recovery
2741                 */
2742                 if (disabled_flag_changed) {
2743                         rec->need_takeover_run = true;
2744                 }
2745         }
2746
2747         talloc_free(tmp_ctx);
2748 }
2749
2750 /*
2751   handler for when we need to push out flag changes ot all other nodes
2752 */
2753 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2754                             TDB_DATA data, void *private_data)
2755 {
2756         int ret;
2757         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2758         struct ctdb_node_map *nodemap=NULL;
2759         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2760         uint32_t recmaster;
2761         uint32_t *nodes;
2762
2763         /* find the recovery master */
2764         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2765         if (ret != 0) {
2766                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2767                 talloc_free(tmp_ctx);
2768                 return;
2769         }
2770
2771         /* read the node flags from the recmaster */
2772         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2773         if (ret != 0) {
2774                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2775                 talloc_free(tmp_ctx);
2776                 return;
2777         }
2778         if (c->pnn >= nodemap->num) {
2779                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2780                 talloc_free(tmp_ctx);
2781                 return;
2782         }
2783
2784         /* send the flags update to all connected nodes */
2785         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2786
2787         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2788                                       nodes, 0, CONTROL_TIMEOUT(),
2789                                       false, data,
2790                                       NULL, NULL,
2791                                       NULL) != 0) {
2792                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2793
2794                 talloc_free(tmp_ctx);
2795                 return;
2796         }
2797
2798         talloc_free(tmp_ctx);
2799 }
2800
2801
2802 struct verify_recmode_normal_data {
2803         uint32_t count;
2804         enum monitor_result status;
2805 };
2806
2807 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2808 {
2809         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2810
2811
2812         /* one more node has responded with recmode data*/
2813         rmdata->count--;
2814
2815         /* if we failed to get the recmode, then return an error and let
2816            the main loop try again.
2817         */
2818         if (state->state != CTDB_CONTROL_DONE) {
2819                 if (rmdata->status == MONITOR_OK) {
2820                         rmdata->status = MONITOR_FAILED;
2821                 }
2822                 return;
2823         }
2824
2825         /* if we got a response, then the recmode will be stored in the
2826            status field
2827         */
2828         if (state->status != CTDB_RECOVERY_NORMAL) {
2829                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2830                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2831         }
2832
2833         return;
2834 }
2835
2836
2837 /* verify that all nodes are in normal recovery mode */
2838 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2839 {
2840         struct verify_recmode_normal_data *rmdata;
2841         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2842         struct ctdb_client_control_state *state;
2843         enum monitor_result status;
2844         int j;
2845         
2846         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2847         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2848         rmdata->count  = 0;
2849         rmdata->status = MONITOR_OK;
2850
2851         /* loop over all active nodes and send an async getrecmode call to 
2852            them*/
2853         for (j=0; j<nodemap->num; j++) {
2854                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2855                         continue;
2856                 }
2857                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2858                                         CONTROL_TIMEOUT(), 
2859                                         nodemap->nodes[j].pnn);
2860                 if (state == NULL) {
2861                         /* we failed to send the control, treat this as 
2862                            an error and try again next iteration
2863                         */                      
2864                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2865                         talloc_free(mem_ctx);
2866                         return MONITOR_FAILED;
2867                 }
2868
2869                 /* set up the callback functions */
2870                 state->async.fn = verify_recmode_normal_callback;
2871                 state->async.private_data = rmdata;
2872
2873                 /* one more control to wait for to complete */
2874                 rmdata->count++;
2875         }
2876
2877
2878         /* now wait for up to the maximum number of seconds allowed
2879            or until all nodes we expect a response from has replied
2880         */
2881         while (rmdata->count > 0) {
2882                 event_loop_once(ctdb->ev);
2883         }
2884
2885         status = rmdata->status;
2886         talloc_free(mem_ctx);
2887         return status;
2888 }
2889
2890
2891 struct verify_recmaster_data {
2892         struct ctdb_recoverd *rec;
2893         uint32_t count;
2894         uint32_t pnn;
2895         enum monitor_result status;
2896 };
2897
2898 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2899 {
2900         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2901
2902
2903         /* one more node has responded with recmaster data*/
2904         rmdata->count--;
2905
2906         /* if we failed to get the recmaster, then return an error and let
2907            the main loop try again.
2908         */
2909         if (state->state != CTDB_CONTROL_DONE) {
2910                 if (rmdata->status == MONITOR_OK) {
2911                         rmdata->status = MONITOR_FAILED;
2912                 }
2913                 return;
2914         }
2915
2916         /* if we got a response, then the recmaster will be stored in the
2917            status field
2918         */
2919         if (state->status != rmdata->pnn) {
2920                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2921                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2922                 rmdata->status = MONITOR_ELECTION_NEEDED;
2923         }
2924
2925         return;
2926 }
2927
2928
2929 /* verify that all nodes agree that we are the recmaster */
2930 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2931 {
2932         struct ctdb_context *ctdb = rec->ctdb;
2933         struct verify_recmaster_data *rmdata;
2934         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2935         struct ctdb_client_control_state *state;
2936         enum monitor_result status;
2937         int j;
2938         
2939         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2940         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2941         rmdata->rec    = rec;
2942         rmdata->count  = 0;
2943         rmdata->pnn    = pnn;
2944         rmdata->status = MONITOR_OK;
2945
2946         /* loop over all active nodes and send an async getrecmaster call to 
2947            them*/
2948         for (j=0; j<nodemap->num; j++) {
2949                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2950                         continue;
2951                 }
2952                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2953                                         CONTROL_TIMEOUT(),
2954                                         nodemap->nodes[j].pnn);
2955                 if (state == NULL) {
2956                         /* we failed to send the control, treat this as 
2957                            an error and try again next iteration
2958                         */                      
2959                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2960                         talloc_free(mem_ctx);
2961                         return MONITOR_FAILED;
2962                 }
2963
2964                 /* set up the callback functions */
2965                 state->async.fn = verify_recmaster_callback;
2966                 state->async.private_data = rmdata;
2967
2968                 /* one more control to wait for to complete */
2969                 rmdata->count++;
2970         }
2971
2972
2973         /* now wait for up to the maximum number of seconds allowed
2974            or until all nodes we expect a response from has replied
2975         */
2976         while (rmdata->count > 0) {
2977                 event_loop_once(ctdb->ev);
2978         }
2979
2980         status = rmdata->status;
2981         talloc_free(mem_ctx);
2982         return status;
2983 }
2984
2985 static bool interfaces_have_changed(struct ctdb_context *ctdb,
2986                                     struct ctdb_recoverd *rec)
2987 {
2988         struct ctdb_control_get_ifaces *ifaces = NULL;
2989         TALLOC_CTX *mem_ctx;
2990         bool ret = false;
2991
2992         mem_ctx = talloc_new(NULL);
2993
2994         /* Read the interfaces from the local node */
2995         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
2996                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
2997                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
2998                 /* We could return an error.  However, this will be
2999                  * rare so we'll decide that the interfaces have
3000                  * actually changed, just in case.
3001                  */
3002                 talloc_free(mem_ctx);
3003                 return true;
3004         }
3005
3006         if (!rec->ifaces) {
3007                 /* We haven't been here before so things have changed */
3008                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3009                 ret = true;
3010         } else if (rec->ifaces->num != ifaces->num) {
3011                 /* Number of interfaces has changed */
3012                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3013                                      rec->ifaces->num, ifaces->num));
3014                 ret = true;
3015         } else {
3016                 /* See if interface names or link states have changed */
3017                 int i;
3018                 for (i = 0; i < rec->ifaces->num; i++) {
3019                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3020                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3021                                 DEBUG(DEBUG_NOTICE,
3022                                       ("Interface in slot %d changed: %s => %s\n",
3023                                        i, iface->name, ifaces->ifaces[i].name));
3024                                 ret = true;
3025                                 break;
3026                         }
3027                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3028                                 DEBUG(DEBUG_NOTICE,
3029                                       ("Interface %s changed state: %d => %d\n",
3030                                        iface->name, iface->link_state,
3031                                        ifaces->ifaces[i].link_state));
3032                                 ret = true;
3033                                 break;
3034                         }
3035                 }
3036         }
3037
3038         talloc_free(rec->ifaces);
3039         rec->ifaces = talloc_steal(rec, ifaces);
3040
3041         talloc_free(mem_ctx);
3042         return ret;
3043 }
3044
3045 /* called to check that the local allocation of public ip addresses is ok.
3046 */
3047 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3048 {
3049         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3050         struct ctdb_uptime *uptime1 = NULL;
3051         struct ctdb_uptime *uptime2 = NULL;
3052         int ret, j;
3053         bool need_takeover_run = false;
3054
3055         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3056                                 CTDB_CURRENT_NODE, &uptime1);
3057         if (ret != 0) {
3058                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3059                 talloc_free(mem_ctx);
3060                 return -1;
3061         }
3062
3063         if (interfaces_have_changed(ctdb, rec)) {
3064                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3065                                      "local node %u - force takeover run\n",
3066                                      pnn));
3067                 need_takeover_run = true;
3068         }
3069
3070         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3071                                 CTDB_CURRENT_NODE, &uptime2);
3072         if (ret != 0) {
3073                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3074                 talloc_free(mem_ctx);
3075                 return -1;
3076         }
3077
3078         /* skip the check if the startrecovery time has changed */
3079         if (timeval_compare(&uptime1->last_recovery_started,
3080                             &uptime2->last_recovery_started) != 0) {
3081                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3082                 talloc_free(mem_ctx);
3083                 return 0;
3084         }
3085
3086         /* skip the check if the endrecovery time has changed */
3087         if (timeval_compare(&uptime1->last_recovery_finished,
3088                             &uptime2->last_recovery_finished) != 0) {
3089                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3090                 talloc_free(mem_ctx);
3091                 return 0;
3092         }
3093
3094         /* skip the check if we have started but not finished recovery */
3095         if (timeval_compare(&uptime1->last_recovery_finished,
3096                             &uptime1->last_recovery_started) != 1) {
3097                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3098                 talloc_free(mem_ctx);
3099
3100                 return 0;
3101         }
3102
3103         /* verify that we have the ip addresses we should have
3104            and we dont have ones we shouldnt have.
3105            if we find an inconsistency we set recmode to
3106            active on the local node and wait for the recmaster
3107            to do a full blown recovery.
3108            also if the pnn is -1 and we are healthy and can host the ip
3109            we also request a ip reallocation.
3110         */
3111         if (ctdb->tunable.disable_ip_failover == 0) {
3112                 struct ctdb_all_public_ips *ips = NULL;
3113
3114                 /* read the *available* IPs from the local node */
3115                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3116                 if (ret != 0) {
3117                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3118                         talloc_free(mem_ctx);
3119                         return -1;
3120                 }
3121
3122                 for (j=0; j<ips->num; j++) {
3123                         if (ips->ips[j].pnn == -1 &&
3124                             nodemap->nodes[pnn].flags == 0) {
3125                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3126                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3127                                 need_takeover_run = true;
3128                         }
3129                 }
3130
3131                 talloc_free(ips);
3132
3133                 /* read the *known* IPs from the local node */
3134                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3135                 if (ret != 0) {
3136                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3137                         talloc_free(mem_ctx);
3138                         return -1;
3139                 }
3140
3141                 for (j=0; j<ips->num; j++) {
3142                         if (ips->ips[j].pnn == pnn) {
3143                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3144                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3145                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3146                                         need_takeover_run = true;
3147                                 }
3148                         } else {
3149                                 if (ctdb->do_checkpublicip &&
3150                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3151
3152                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3153                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3154
3155                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3156                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3157                                         }
3158                                 }
3159                         }
3160                 }
3161         }
3162
3163         if (need_takeover_run) {
3164                 struct srvid_request rd;
3165                 TDB_DATA data;
3166
3167                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3168
3169                 rd.pnn = ctdb->pnn;
3170                 rd.srvid = 0;
3171                 data.dptr = (uint8_t *)&rd;
3172                 data.dsize = sizeof(rd);
3173
3174                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3175                 if (ret != 0) {
3176                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3177                 }
3178         }
3179         talloc_free(mem_ctx);
3180         return 0;
3181 }
3182
3183
3184 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3185 {
3186         struct ctdb_node_map **remote_nodemaps = callback_data;
3187
3188         if (node_pnn >= ctdb->num_nodes) {
3189                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3190                 return;
3191         }
3192
3193         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3194
3195 }
3196
3197 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3198         struct ctdb_node_map *nodemap,
3199         struct ctdb_node_map **remote_nodemaps)
3200 {
3201         uint32_t *nodes;
3202
3203         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3204         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3205                                         nodes, 0,
3206                                         CONTROL_TIMEOUT(), false, tdb_null,
3207                                         async_getnodemap_callback,
3208                                         NULL,
3209                                         remote_nodemaps) != 0) {
3210                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3211
3212                 return -1;
3213         }
3214
3215         return 0;
3216 }
3217
3218 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3219 struct ctdb_check_reclock_state {
3220         struct ctdb_context *ctdb;
3221         struct timeval start_time;
3222         int fd[2];
3223         pid_t child;
3224         struct timed_event *te;
3225         struct fd_event *fde;
3226         enum reclock_child_status status;
3227 };
3228
3229 /* when we free the reclock state we must kill any child process.
3230 */
3231 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3232 {
3233         struct ctdb_context *ctdb = state->ctdb;
3234
3235         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3236
3237         if (state->fd[0] != -1) {
3238                 close(state->fd[0]);
3239                 state->fd[0] = -1;
3240         }
3241         if (state->fd[1] != -1) {
3242                 close(state->fd[1]);
3243                 state->fd[1] = -1;
3244         }
3245         ctdb_kill(ctdb, state->child, SIGKILL);
3246         return 0;
3247 }
3248
3249 /*
3250   called if our check_reclock child times out. this would happen if
3251   i/o to the reclock file blocks.
3252  */
3253 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3254                                          struct timeval t, void *private_data)
3255 {
3256         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3257                                            struct ctdb_check_reclock_state);
3258
3259         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3260         state->status = RECLOCK_TIMEOUT;
3261 }
3262
3263 /* this is called when the child process has completed checking the reclock
3264    file and has written data back to us through the pipe.
3265 */
3266 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3267                              uint16_t flags, void *private_data)
3268 {
3269         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3270                                              struct ctdb_check_reclock_state);
3271         char c = 0;
3272         int ret;
3273
3274         /* we got a response from our child process so we can abort the
3275            timeout.
3276         */
3277         talloc_free(state->te);
3278         state->te = NULL;
3279
3280         ret = read(state->fd[0], &c, 1);
3281         if (ret != 1 || c != RECLOCK_OK) {
3282                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3283                 state->status = RECLOCK_FAILED;
3284
3285                 return;
3286         }
3287
3288         state->status = RECLOCK_OK;
3289         return;
3290 }
3291
3292 static int check_recovery_lock(struct ctdb_context *ctdb)
3293 {
3294         int ret;
3295         struct ctdb_check_reclock_state *state;
3296         pid_t parent = getpid();
3297
3298         if (ctdb->recovery_lock_fd == -1) {
3299                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3300                 return -1;
3301         }
3302
3303         state = talloc(ctdb, struct ctdb_check_reclock_state);
3304         CTDB_NO_MEMORY(ctdb, state);
3305
3306         state->ctdb = ctdb;
3307         state->start_time = timeval_current();
3308         state->status = RECLOCK_CHECKING;
3309         state->fd[0] = -1;
3310         state->fd[1] = -1;
3311
3312         ret = pipe(state->fd);
3313         if (ret != 0) {
3314                 talloc_free(state);
3315                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3316                 return -1;
3317         }
3318
3319         state->child = ctdb_fork(ctdb);
3320         if (state->child == (pid_t)-1) {
3321                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3322                 close(state->fd[0]);
3323                 state->fd[0] = -1;
3324                 close(state->fd[1]);
3325                 state->fd[1] = -1;
3326                 talloc_free(state);
3327                 return -1;
3328         }
3329
3330         if (state->child == 0) {
3331                 char cc = RECLOCK_OK;
3332                 close(state->fd[0]);
3333                 state->fd[0] = -1;
3334
3335                 ctdb_set_process_name("ctdb_rec_reclock");
3336                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3337                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3338                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3339                         cc = RECLOCK_FAILED;
3340                 }
3341
3342                 write(state->fd[1], &cc, 1);
3343                 /* make sure we die when our parent dies */
3344                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3345                         sleep(5);
3346                 }
3347                 _exit(0);
3348         }
3349         close(state->fd[1]);
3350         state->fd[1] = -1;
3351         set_close_on_exec(state->fd[0]);
3352
3353         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3354
3355         talloc_set_destructor(state, check_reclock_destructor);
3356
3357         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3358                                     ctdb_check_reclock_timeout, state);
3359         if (state->te == NULL) {
3360                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3361                 talloc_free(state);
3362                 return -1;
3363         }
3364
3365         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3366                                 EVENT_FD_READ,
3367                                 reclock_child_handler,
3368                                 (void *)state);
3369
3370         if (state->fde == NULL) {
3371                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3372                 talloc_free(state);
3373                 return -1;
3374         }
3375         tevent_fd_set_auto_close(state->fde);
3376
3377         while (state->status == RECLOCK_CHECKING) {
3378                 event_loop_once(ctdb->ev);
3379         }
3380
3381         if (state->status == RECLOCK_FAILED) {
3382                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3383                 close(ctdb->recovery_lock_fd);
3384                 ctdb->recovery_lock_fd = -1;
3385                 talloc_free(state);
3386                 return -1;
3387         }
3388
3389         talloc_free(state);
3390         return 0;
3391 }
3392
3393 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3394 {
3395         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3396         const char *reclockfile;
3397
3398         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3399                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3400                 talloc_free(tmp_ctx);
3401                 return -1;      
3402         }
3403
3404         if (reclockfile == NULL) {
3405                 if (ctdb->recovery_lock_file != NULL) {
3406                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3407                         talloc_free(ctdb->recovery_lock_file);
3408                         ctdb->recovery_lock_file = NULL;
3409                         if (ctdb->recovery_lock_fd != -1) {
3410                                 close(ctdb->recovery_lock_fd);
3411                                 ctdb->recovery_lock_fd = -1;
3412                         }
3413                 }
3414                 ctdb->tunable.verify_recovery_lock = 0;
3415                 talloc_free(tmp_ctx);
3416                 return 0;
3417         }
3418
3419         if (ctdb->recovery_lock_file == NULL) {
3420                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3421                 if (ctdb->recovery_lock_fd != -1) {
3422                         close(ctdb->recovery_lock_fd);
3423                         ctdb->recovery_lock_fd = -1;
3424                 }
3425                 talloc_free(tmp_ctx);
3426                 return 0;
3427         }
3428
3429
3430         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3431                 talloc_free(tmp_ctx);
3432                 return 0;
3433         }
3434
3435         talloc_free(ctdb->recovery_lock_file);
3436         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3437         ctdb->tunable.verify_recovery_lock = 0;
3438         if (ctdb->recovery_lock_fd != -1) {
3439                 close(ctdb->recovery_lock_fd);
3440                 ctdb->recovery_lock_fd = -1;
3441         }
3442
3443         talloc_free(tmp_ctx);
3444         return 0;
3445 }
3446
3447 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3448                       TALLOC_CTX *mem_ctx)
3449 {
3450         uint32_t pnn;
3451         struct ctdb_node_map *nodemap=NULL;
3452         struct ctdb_node_map *recmaster_nodemap=NULL;
3453         struct ctdb_node_map **remote_nodemaps=NULL;
3454         struct ctdb_vnn_map *vnnmap=NULL;
3455         struct ctdb_vnn_map *remote_vnnmap=NULL;
3456         int32_t debug_level;
3457         int i, j, ret;
3458         bool self_ban;
3459
3460
3461         /* verify that the main daemon is still running */
3462         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3463                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3464                 exit(-1);
3465         }
3466
3467         /* ping the local daemon to tell it we are alive */
3468         ctdb_ctrl_recd_ping(ctdb);
3469
3470         if (rec->election_timeout) {
3471                 /* an election is in progress */
3472                 return;
3473         }
3474
3475         /* read the debug level from the parent and update locally */
3476         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3477         if (ret !=0) {
3478                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3479                 return;
3480         }
3481         LogLevel = debug_level;
3482
3483         /* get relevant tunables */
3484         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3485         if (ret != 0) {
3486                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3487                 return;
3488         }
3489
3490         /* get the current recovery lock file from the server */
3491         if (update_recovery_lock_file(ctdb) != 0) {
3492                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3493                 return;
3494         }
3495
3496         /* Make sure that if recovery lock verification becomes disabled when
3497            we close the file
3498         */
3499         if (ctdb->tunable.verify_recovery_lock == 0) {
3500                 if (ctdb->recovery_lock_fd != -1) {
3501                         close(ctdb->recovery_lock_fd);
3502                         ctdb->recovery_lock_fd = -1;
3503                 }
3504         }
3505
3506         pnn = ctdb_get_pnn(ctdb);
3507
3508         /* get the vnnmap */
3509         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3510         if (ret != 0) {
3511                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3512                 return;
3513         }
3514
3515
3516         /* get number of nodes */
3517         if (rec->nodemap) {
3518                 talloc_free(rec->nodemap);
3519                 rec->nodemap = NULL;
3520                 nodemap=NULL;
3521         }
3522         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3523         if (ret != 0) {
3524                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3525                 return;
3526         }
3527         nodemap = rec->nodemap;
3528
3529         /* remember our own node flags */
3530         rec->node_flags = nodemap->nodes[pnn].flags;
3531
3532         ban_misbehaving_nodes(rec, &self_ban);
3533         if (self_ban) {
3534                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3535                 return;
3536         }
3537
3538         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3539            also frozen and that the recmode is set to active.
3540         */
3541         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3542                 /* If this node has become inactive then we want to
3543                  * reduce the chances of it taking over the recovery
3544                  * master role when it becomes active again.  This
3545                  * helps to stabilise the recovery master role so that
3546                  * it stays on the most stable node.
3547                  */
3548                 rec->priority_time = timeval_current();
3549
3550                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3551                 if (ret != 0) {
3552                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3553                 }
3554                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3555                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3556
3557                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3558                         if (ret != 0) {
3559                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3560                                 return;
3561                         }
3562                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3563                         if (ret != 0) {
3564                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3565
3566                                 return;
3567                         }
3568                 }
3569
3570                 /* If this node is stopped or banned then it is not the recovery
3571                  * master, so don't do anything. This prevents stopped or banned
3572                  * node from starting election and sending unnecessary controls.
3573                  */
3574                 return;
3575         }
3576
3577         /* check which node is the recovery master */
3578         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3579         if (ret != 0) {
3580                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3581                 return;
3582         }
3583
3584         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3585         if (rec->recmaster != pnn) {
3586                 TALLOC_FREE(rec->reallocate_requests);
3587         }
3588
3589         /* This is a special case.  When recovery daemon is started, recmaster
3590          * is set to -1.  If a node is not started in stopped state, then
3591          * start election to decide recovery master
3592          */
3593         if (rec->recmaster == (uint32_t)-1) {
3594                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3595                 force_election(rec, pnn, nodemap);
3596                 return;
3597         }
3598
3599         /* update the capabilities for all nodes */
3600         ret = update_capabilities(ctdb, nodemap);
3601         if (ret != 0) {
3602                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3603                 return;
3604         }
3605
3606         /*
3607          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3608          * but we have, then force an election and try to become the new
3609          * recmaster.
3610          */
3611         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3612             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3613              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3614                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3615                                   " but we (node %u) have - force an election\n",
3616                                   rec->recmaster, pnn));
3617                 force_election(rec, pnn, nodemap);
3618                 return;
3619         }
3620
3621         /* count how many active nodes there are */
3622         rec->num_active    = 0;
3623         rec->num_connected = 0;
3624         for (i=0; i<nodemap->num; i++) {
3625                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3626                         rec->num_active++;
3627                 }
3628                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3629                         rec->num_connected++;
3630                 }
3631         }
3632
3633
3634         /* verify that the recmaster node is still active */
3635         for (j=0; j<nodemap->num; j++) {
3636                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3637                         break;
3638                 }
3639         }
3640
3641         if (j == nodemap->num) {
3642                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3643                 force_election(rec, pnn, nodemap);
3644                 return;
3645         }
3646
3647         /* if recovery master is disconnected we must elect a new recmaster */
3648         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3649                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3650                 force_election(rec, pnn, nodemap);
3651                 return;
3652         }
3653
3654         /* get nodemap from the recovery master to check if it is inactive */
3655         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3656                                    mem_ctx, &recmaster_nodemap);
3657         if (ret != 0) {
3658                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3659                           nodemap->nodes[j].pnn));
3660                 return;
3661         }
3662
3663
3664         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3665             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3666                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3667                 /*
3668                  * update our nodemap to carry the recmaster's notion of
3669                  * its own flags, so that we don't keep freezing the
3670                  * inactive recmaster node...
3671                  */
3672                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3673                 force_election(rec, pnn, nodemap);
3674                 return;
3675         }
3676
3677         /* verify that we have all ip addresses we should have and we dont
3678          * have addresses we shouldnt have.
3679          */ 
3680         if (ctdb->tunable.disable_ip_failover == 0) {
3681                 if (rec->ip_check_disable_ctx == NULL) {
3682                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3683                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3684                         }
3685                 }
3686         }
3687
3688
3689         /* if we are not the recmaster then we do not need to check
3690            if recovery is needed
3691          */
3692         if (pnn != rec->recmaster) {
3693                 return;
3694         }
3695
3696
3697         /* ensure our local copies of flags are right */
3698         ret = update_local_flags(rec, nodemap);
3699         if (ret == MONITOR_ELECTION_NEEDED) {
3700                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3701                 force_election(rec, pnn, nodemap);
3702                 return;
3703         }
3704         if (ret != MONITOR_OK) {
3705                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3706                 return;
3707         }
3708
3709         if (ctdb->num_nodes != nodemap->num) {
3710                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3711                 reload_nodes_file(ctdb);
3712                 return;
3713         }
3714
3715         /* verify that all active nodes agree that we are the recmaster */
3716         switch (verify_recmaster(rec, nodemap, pnn)) {
3717         case MONITOR_RECOVERY_NEEDED:
3718                 /* can not happen */
3719                 return;
3720         case MONITOR_ELECTION_NEEDED:
3721                 force_election(rec, pnn, nodemap);
3722                 return;
3723         case MONITOR_OK:
3724                 break;
3725         case MONITOR_FAILED:
3726                 return;
3727         }
3728
3729
3730         if (rec->need_recovery) {
3731                 /* a previous recovery didn't finish */
3732                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3733                 return;
3734         }
3735
3736         /* verify that all active nodes are in normal mode 
3737            and not in recovery mode 
3738         */
3739         switch (verify_recmode(ctdb, nodemap)) {
3740         case MONITOR_RECOVERY_NEEDED:
3741                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3742                 return;
3743         case MONITOR_FAILED:
3744                 return;
3745         case MONITOR_ELECTION_NEEDED:
3746                 /* can not happen */
3747         case MONITOR_OK:
3748                 break;
3749         }
3750
3751
3752         if (ctdb->tunable.verify_recovery_lock != 0) {
3753                 /* we should have the reclock - check its not stale */
3754                 ret = check_recovery_lock(ctdb);
3755                 if (ret != 0) {
3756                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3757                         ctdb_set_culprit(rec, ctdb->pnn);
3758                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3759                         return;
3760                 }
3761         }
3762
3763
3764         /* is there a pending reload all ips ? */
3765         if (reload_all_ips_request != NULL) {
3766                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3767                 talloc_free(reload_all_ips_request);
3768                 reload_all_ips_request = NULL;
3769         }
3770
3771         /* if there are takeovers requested, perform it and notify the waiters */
3772         if (rec->reallocate_requests) {
3773                 process_ipreallocate_requests(ctdb, rec);
3774         }
3775
3776         /* get the nodemap for all active remote nodes
3777          */
3778         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3779         if (remote_nodemaps == NULL) {
3780                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3781                 return;
3782         }
3783         for(i=0; i<nodemap->num; i++) {
3784                 remote_nodemaps[i] = NULL;
3785         }
3786         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3787                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3788                 return;
3789         } 
3790
3791         /* verify that all other nodes have the same nodemap as we have
3792         */
3793         for (j=0; j<nodemap->num; j++) {
3794                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3795                         continue;
3796                 }
3797
3798                 if (remote_nodemaps[j] == NULL) {
3799                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3800                         ctdb_set_culprit(rec, j);
3801
3802                         return;
3803                 }
3804
3805                 /* if the nodes disagree on how many nodes there are
3806                    then this is a good reason to try recovery
3807                  */
3808                 if (remote_nodemaps[j]->num != nodemap->num) {
3809                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3810                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3811                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3812                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3813                         return;
3814                 }
3815
3816                 /* if the nodes disagree on which nodes exist and are
3817                    active, then that is also a good reason to do recovery
3818                  */
3819                 for (i=0;i<nodemap->num;i++) {
3820                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3821                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3822                                           nodemap->nodes[j].pnn, i, 
3823                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3824                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3825                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3826                                             vnnmap);
3827                                 return;
3828                         }
3829                 }
3830         }
3831
3832         /*
3833          * Update node flags obtained from each active node. This ensure we have
3834          * up-to-date information for all the nodes.
3835          */
3836         for (j=0; j<nodemap->num; j++) {
3837                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3838                         continue;
3839                 }
3840                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3841         }
3842
3843         for (j=0; j<nodemap->num; j++) {
3844                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3845                         continue;
3846                 }
3847
3848                 /* verify the flags are consistent
3849                 */
3850                 for (i=0; i<nodemap->num; i++) {
3851                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3852                                 continue;
3853                         }
3854                         
3855                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3856                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3857                                   nodemap->nodes[j].pnn, 
3858                                   nodemap->nodes[i].pnn, 
3859                                   remote_nodemaps[j]->nodes[i].flags,
3860                                   nodemap->nodes[i].flags));
3861                                 if (i == j) {
3862                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3863                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3864                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3865                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3866                                                     vnnmap);
3867                                         return;
3868                                 } else {
3869                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3870                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3871                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3872                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3873                                                     vnnmap);
3874                                         return;
3875                                 }
3876                         }
3877                 }
3878         }
3879
3880
3881         /* there better be the same number of lmasters in the vnn map
3882            as there are active nodes or we will have to do a recovery
3883          */
3884         if (vnnmap->size != rec->num_active) {
3885                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3886                           vnnmap->size, rec->num_active));
3887                 ctdb_set_culprit(rec, ctdb->pnn);
3888                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3889                 return;
3890         }
3891
3892         /* verify that all active nodes in the nodemap also exist in 
3893            the vnnmap.
3894          */
3895         for (j=0; j<nodemap->num; j++) {
3896                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3897                         continue;
3898                 }
3899                 if (nodemap->nodes[j].pnn == pnn) {
3900                         continue;
3901                 }
3902
3903                 for (i=0; i<vnnmap->size; i++) {
3904                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3905                                 break;
3906                         }
3907                 }
3908                 if (i == vnnmap->size) {
3909                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3910                                   nodemap->nodes[j].pnn));
3911                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3912                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3913                         return;
3914                 }
3915         }
3916
3917         
3918         /* verify that all other nodes have the same vnnmap
3919            and are from the same generation
3920          */
3921         for (j=0; j<nodemap->num; j++) {
3922                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3923                         continue;
3924                 }
3925                 if (nodemap->nodes[j].pnn == pnn) {
3926                         continue;
3927                 }
3928
3929                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3930                                           mem_ctx, &remote_vnnmap);
3931                 if (ret != 0) {
3932                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3933                                   nodemap->nodes[j].pnn));
3934                         return;
3935                 }
3936
3937                 /* verify the vnnmap generation is the same */
3938                 if (vnnmap->generation != remote_vnnmap->generation) {
3939                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3940                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3941                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3942                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3943                         return;
3944                 }
3945
3946                 /* verify the vnnmap size is the same */
3947                 if (vnnmap->size != remote_vnnmap->size) {
3948                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3949                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3950                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3951                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3952                         return;
3953                 }
3954
3955                 /* verify the vnnmap is the same */
3956                 for (i=0;i<vnnmap->size;i++) {
3957                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3958                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3959                                           nodemap->nodes[j].pnn));
3960                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3961                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3962                                             vnnmap);
3963                                 return;
3964                         }
3965                 }
3966         }
3967
3968         /* we might need to change who has what IP assigned */
3969         if (rec->need_takeover_run) {
3970                 uint32_t culprit = (uint32_t)-1;
3971
3972                 rec->need_takeover_run = false;
3973
3974                 /* update the list of public ips that a node can handle for
3975                    all connected nodes
3976                 */
3977                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3978                 if (ret != 0) {
3979                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3980                                          culprit));
3981                         rec->need_takeover_run = true;
3982                         return;
3983                 }
3984
3985                 /* execute the "startrecovery" event script on all nodes */
3986                 ret = run_startrecovery_eventscript(rec, nodemap);
3987                 if (ret!=0) {
3988                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3989                         ctdb_set_culprit(rec, ctdb->pnn);
3990                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3991                         return;
3992                 }
3993
3994                 /* If takeover run fails, then the offending nodes are
3995                  * assigned ban culprit counts. And we re-try takeover.
3996                  * If takeover run fails repeatedly, the node would get
3997                  * banned.
3998                  *
3999                  * If rec->need_takeover_run is not set to true at this
4000                  * failure, monitoring is disabled cluster-wide (via
4001                  * startrecovery eventscript) and will not get enabled.
4002                  */
4003                 if (!do_takeover_run(rec, nodemap, true)) {
4004                         return;
4005                 }
4006
4007                 /* execute the "recovered" event script on all nodes */
4008                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4009 #if 0
4010 // we cant check whether the event completed successfully
4011 // since this script WILL fail if the node is in recovery mode
4012 // and if that race happens, the code here would just cause a second
4013 // cascading recovery.
4014                 if (ret!=0) {
4015                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4016                         ctdb_set_culprit(rec, ctdb->pnn);
4017                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4018                 }
4019 #endif
4020         }
4021 }
4022
4023 /*
4024   the main monitoring loop
4025  */
4026 static void monitor_cluster(struct ctdb_context *ctdb)
4027 {
4028         struct ctdb_recoverd *rec;
4029
4030         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4031
4032         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4033         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4034
4035         rec->ctdb = ctdb;
4036
4037         rec->takeover_run_in_progress = false;
4038
4039         rec->priority_time = timeval_current();
4040
4041         /* register a message port for sending memory dumps */
4042         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4043
4044         /* register a message port for requesting logs */
4045         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4046
4047         /* register a message port for clearing logs */
4048         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4049
4050         /* register a message port for recovery elections */
4051         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4052
4053         /* when nodes are disabled/enabled */
4054         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4055
4056         /* when we are asked to puch out a flag change */
4057         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4058
4059         /* register a message port for vacuum fetch */
4060         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4061
4062         /* register a message port for reloadnodes  */
4063         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4064
4065         /* register a message port for performing a takeover run */
4066         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4067
4068         /* register a message port for performing a reload all ips */
4069         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
4070
4071         /* register a message port for disabling the ip check for a short while */
4072         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4073
4074         /* register a message port for updating the recovery daemons node assignment for an ip */
4075         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4076
4077         /* register a message port for forcing a rebalance of a node next
4078            reallocation */
4079         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4080
4081         for (;;) {
4082                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4083                 struct timeval start;
4084                 double elapsed;
4085
4086                 if (!mem_ctx) {
4087                         DEBUG(DEBUG_CRIT,(__location__
4088                                           " Failed to create temp context\n"));
4089                         exit(-1);
4090                 }
4091
4092                 start = timeval_current();
4093                 main_loop(ctdb, rec, mem_ctx);
4094                 talloc_free(mem_ctx);
4095
4096                 /* we only check for recovery once every second */
4097                 elapsed = timeval_elapsed(&start);
4098                 if (elapsed < ctdb->tunable.recover_interval) {
4099                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4100                                           - elapsed);
4101                 }
4102         }
4103 }
4104
4105 /*
4106   event handler for when the main ctdbd dies
4107  */
4108 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4109                                  uint16_t flags, void *private_data)
4110 {
4111         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4112         _exit(1);
4113 }
4114
4115 /*
4116   called regularly to verify that the recovery daemon is still running
4117  */
4118 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4119                               struct timeval yt, void *p)
4120 {
4121         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4122
4123         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4124                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4125
4126                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4127                                 ctdb_restart_recd, ctdb);
4128
4129                 return;
4130         }
4131
4132         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4133                         timeval_current_ofs(30, 0),
4134                         ctdb_check_recd, ctdb);
4135 }
4136
4137 static void recd_sig_child_handler(struct event_context *ev,
4138         struct signal_event *se, int signum, int count,
4139         void *dont_care, 
4140         void *private_data)
4141 {
4142 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4143         int status;
4144         pid_t pid = -1;
4145
4146         while (pid != 0) {
4147                 pid = waitpid(-1, &status, WNOHANG);
4148                 if (pid == -1) {
4149                         if (errno != ECHILD) {
4150                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4151                         }
4152                         return;
4153                 }
4154                 if (pid > 0) {
4155                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4156                 }
4157         }
4158 }
4159
4160 /*
4161   startup the recovery daemon as a child of the main ctdb daemon
4162  */
4163 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4164 {
4165         int fd[2];
4166         struct signal_event *se;
4167         struct tevent_fd *fde;
4168
4169         if (pipe(fd) != 0) {
4170                 return -1;
4171         }
4172
4173         ctdb->ctdbd_pid = getpid();
4174
4175         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4176         if (ctdb->recoverd_pid == -1) {
4177                 return -1;
4178         }
4179
4180         if (ctdb->recoverd_pid != 0) {
4181                 talloc_free(ctdb->recd_ctx);
4182                 ctdb->recd_ctx = talloc_new(ctdb);
4183                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4184
4185                 close(fd[0]);
4186                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4187                                 timeval_current_ofs(30, 0),
4188                                 ctdb_check_recd, ctdb);
4189                 return 0;
4190         }
4191
4192         close(fd[1]);
4193
4194         srandom(getpid() ^ time(NULL));
4195
4196         /* Clear the log ringbuffer */
4197         ctdb_clear_log(ctdb);
4198
4199         ctdb_set_process_name("ctdb_recovered");
4200         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4201                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4202                 exit(1);
4203         }
4204
4205         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4206
4207         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4208                      ctdb_recoverd_parent, &fd[0]);
4209         tevent_fd_set_auto_close(fde);
4210
4211         /* set up a handler to pick up sigchld */
4212         se = event_add_signal(ctdb->ev, ctdb,
4213                                      SIGCHLD, 0,
4214                                      recd_sig_child_handler,
4215                                      ctdb);
4216         if (se == NULL) {
4217                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4218                 exit(1);
4219         }
4220
4221         monitor_cluster(ctdb);
4222
4223         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4224         return -1;
4225 }
4226
4227 /*
4228   shutdown the recovery daemon
4229  */
4230 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4231 {
4232         if (ctdb->recoverd_pid == 0) {
4233                 return;
4234         }
4235
4236         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4237         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4238
4239         TALLOC_FREE(ctdb->recd_ctx);
4240         TALLOC_FREE(ctdb->recd_ping_count);
4241 }
4242
4243 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4244                        struct timeval t, void *private_data)
4245 {
4246         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4247
4248         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4249         ctdb_stop_recoverd(ctdb);
4250         ctdb_start_recoverd(ctdb);
4251 }