recoverd: Improve logging for takeover runs
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_connected;
133         uint32_t last_culprit_node;
134         struct ctdb_node_map *nodemap;
135         struct timeval priority_time;
136         bool need_takeover_run;
137         bool need_recovery;
138         uint32_t node_flags;
139         struct timed_event *send_election_te;
140         struct timed_event *election_timeout;
141         struct vacuum_info *vacuum_info;
142         struct srvid_requests *reallocate_requests;
143         bool takeover_run_in_progress;
144         TALLOC_CTX *takeover_runs_disable_ctx;
145         struct ctdb_control_get_ifaces *ifaces;
146         uint32_t *force_rebalance_nodes;
147 };
148
149 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
150 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
151
152 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
153
154 /*
155   ban a node for a period of time
156  */
157 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
158 {
159         int ret;
160         struct ctdb_context *ctdb = rec->ctdb;
161         struct ctdb_ban_time bantime;
162        
163         if (!ctdb_validate_pnn(ctdb, pnn)) {
164                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
165                 return;
166         }
167
168         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
169
170         bantime.pnn  = pnn;
171         bantime.time = ban_time;
172
173         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
174         if (ret != 0) {
175                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
176                 return;
177         }
178
179 }
180
181 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
182
183
184 /*
185   remember the trouble maker
186  */
187 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
188 {
189         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
190         struct ctdb_banning_state *ban_state;
191
192         if (culprit > ctdb->num_nodes) {
193                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
194                 return;
195         }
196
197         /* If we are banned or stopped, do not set other nodes as culprits */
198         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
199                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
200                 return;
201         }
202
203         if (ctdb->nodes[culprit]->ban_state == NULL) {
204                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
205                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
206
207                 
208         }
209         ban_state = ctdb->nodes[culprit]->ban_state;
210         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
211                 /* this was the first time in a long while this node
212                    misbehaved so we will forgive any old transgressions.
213                 */
214                 ban_state->count = 0;
215         }
216
217         ban_state->count += count;
218         ban_state->last_reported_time = timeval_current();
219         rec->last_culprit_node = culprit;
220 }
221
222 /*
223   remember the trouble maker
224  */
225 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
226 {
227         ctdb_set_culprit_count(rec, culprit, 1);
228 }
229
230
231 /* this callback is called for every node that failed to execute the
232    recovered event
233 */
234 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
235 {
236         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
237
238         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
239
240         ctdb_set_culprit(rec, node_pnn);
241 }
242
243 /*
244   run the "recovered" eventscript on all nodes
245  */
246 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
247 {
248         TALLOC_CTX *tmp_ctx;
249         uint32_t *nodes;
250         struct ctdb_context *ctdb = rec->ctdb;
251
252         tmp_ctx = talloc_new(ctdb);
253         CTDB_NO_MEMORY(ctdb, tmp_ctx);
254
255         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
256         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
257                                         nodes, 0,
258                                         CONTROL_TIMEOUT(), false, tdb_null,
259                                         NULL, recovered_fail_callback,
260                                         rec) != 0) {
261                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
262
263                 talloc_free(tmp_ctx);
264                 return -1;
265         }
266
267         talloc_free(tmp_ctx);
268         return 0;
269 }
270
271 /* this callback is called for every node that failed to execute the
272    start recovery event
273 */
274 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
275 {
276         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
277
278         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
279
280         ctdb_set_culprit(rec, node_pnn);
281 }
282
283 /*
284   run the "startrecovery" eventscript on all nodes
285  */
286 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
287 {
288         TALLOC_CTX *tmp_ctx;
289         uint32_t *nodes;
290         struct ctdb_context *ctdb = rec->ctdb;
291
292         tmp_ctx = talloc_new(ctdb);
293         CTDB_NO_MEMORY(ctdb, tmp_ctx);
294
295         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
296         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
297                                         nodes, 0,
298                                         CONTROL_TIMEOUT(), false, tdb_null,
299                                         NULL,
300                                         startrecovery_fail_callback,
301                                         rec) != 0) {
302                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
303                 talloc_free(tmp_ctx);
304                 return -1;
305         }
306
307         talloc_free(tmp_ctx);
308         return 0;
309 }
310
311 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
312 {
313         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
314                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
315                 return;
316         }
317         if (node_pnn < ctdb->num_nodes) {
318                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
319         }
320
321         if (node_pnn == ctdb->pnn) {
322                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
323         }
324 }
325
326 /*
327   update the node capabilities for all connected nodes
328  */
329 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
330 {
331         uint32_t *nodes;
332         TALLOC_CTX *tmp_ctx;
333
334         tmp_ctx = talloc_new(ctdb);
335         CTDB_NO_MEMORY(ctdb, tmp_ctx);
336
337         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
338         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
339                                         nodes, 0,
340                                         CONTROL_TIMEOUT(),
341                                         false, tdb_null,
342                                         async_getcap_callback, NULL,
343                                         NULL) != 0) {
344                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
345                 talloc_free(tmp_ctx);
346                 return -1;
347         }
348
349         talloc_free(tmp_ctx);
350         return 0;
351 }
352
353 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
354 {
355         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
356
357         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
358         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
359 }
360
361 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
362 {
363         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
364
365         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
366         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
367 }
368
369 /*
370   change recovery mode on all nodes
371  */
372 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
373 {
374         TDB_DATA data;
375         uint32_t *nodes;
376         TALLOC_CTX *tmp_ctx;
377
378         tmp_ctx = talloc_new(ctdb);
379         CTDB_NO_MEMORY(ctdb, tmp_ctx);
380
381         /* freeze all nodes */
382         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
383         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
384                 int i;
385
386                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
387                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
388                                                 nodes, i,
389                                                 CONTROL_TIMEOUT(),
390                                                 false, tdb_null,
391                                                 NULL,
392                                                 set_recmode_fail_callback,
393                                                 rec) != 0) {
394                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
395                                 talloc_free(tmp_ctx);
396                                 return -1;
397                         }
398                 }
399         }
400
401
402         data.dsize = sizeof(uint32_t);
403         data.dptr = (unsigned char *)&rec_mode;
404
405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
406                                         nodes, 0,
407                                         CONTROL_TIMEOUT(),
408                                         false, data,
409                                         NULL, NULL,
410                                         NULL) != 0) {
411                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
412                 talloc_free(tmp_ctx);
413                 return -1;
414         }
415
416         talloc_free(tmp_ctx);
417         return 0;
418 }
419
420 /*
421   change recovery master on all node
422  */
423 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
424 {
425         TDB_DATA data;
426         TALLOC_CTX *tmp_ctx;
427         uint32_t *nodes;
428
429         tmp_ctx = talloc_new(ctdb);
430         CTDB_NO_MEMORY(ctdb, tmp_ctx);
431
432         data.dsize = sizeof(uint32_t);
433         data.dptr = (unsigned char *)&pnn;
434
435         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
436         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
437                                         nodes, 0,
438                                         CONTROL_TIMEOUT(), false, data,
439                                         NULL, NULL,
440                                         NULL) != 0) {
441                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
442                 talloc_free(tmp_ctx);
443                 return -1;
444         }
445
446         talloc_free(tmp_ctx);
447         return 0;
448 }
449
450 /* update all remote nodes to use the same db priority that we have
451    this can fail if the remove node has not yet been upgraded to 
452    support this function, so we always return success and never fail
453    a recovery if this call fails.
454 */
455 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
456         struct ctdb_node_map *nodemap, 
457         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int db;
460         uint32_t *nodes;
461
462         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
463
464         /* step through all local databases */
465         for (db=0; db<dbmap->num;db++) {
466                 TDB_DATA data;
467                 struct ctdb_db_priority db_prio;
468                 int ret;
469
470                 db_prio.db_id     = dbmap->dbs[db].dbid;
471                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
472                 if (ret != 0) {
473                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
474                         continue;
475                 }
476
477                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
478
479                 data.dptr  = (uint8_t *)&db_prio;
480                 data.dsize = sizeof(db_prio);
481
482                 if (ctdb_client_async_control(ctdb,
483                                         CTDB_CONTROL_SET_DB_PRIORITY,
484                                         nodes, 0,
485                                         CONTROL_TIMEOUT(), false, data,
486                                         NULL, NULL,
487                                         NULL) != 0) {
488                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
489                 }
490         }
491
492         return 0;
493 }                       
494
495 /*
496   ensure all other nodes have attached to any databases that we have
497  */
498 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
499                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
500 {
501         int i, j, db, ret;
502         struct ctdb_dbid_map *remote_dbmap;
503
504         /* verify that all other nodes have all our databases */
505         for (j=0; j<nodemap->num; j++) {
506                 /* we dont need to ourself ourselves */
507                 if (nodemap->nodes[j].pnn == pnn) {
508                         continue;
509                 }
510                 /* dont check nodes that are unavailable */
511                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
512                         continue;
513                 }
514
515                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
516                                          mem_ctx, &remote_dbmap);
517                 if (ret != 0) {
518                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
519                         return -1;
520                 }
521
522                 /* step through all local databases */
523                 for (db=0; db<dbmap->num;db++) {
524                         const char *name;
525
526
527                         for (i=0;i<remote_dbmap->num;i++) {
528                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
529                                         break;
530                                 }
531                         }
532                         /* the remote node already have this database */
533                         if (i!=remote_dbmap->num) {
534                                 continue;
535                         }
536                         /* ok so we need to create this database */
537                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
538                                             mem_ctx, &name);
539                         if (ret != 0) {
540                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
541                                 return -1;
542                         }
543                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
544                                            mem_ctx, name,
545                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
546                         if (ret != 0) {
547                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
548                                 return -1;
549                         }
550                 }
551         }
552
553         return 0;
554 }
555
556
557 /*
558   ensure we are attached to any databases that anyone else is attached to
559  */
560 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
561                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
562 {
563         int i, j, db, ret;
564         struct ctdb_dbid_map *remote_dbmap;
565
566         /* verify that we have all database any other node has */
567         for (j=0; j<nodemap->num; j++) {
568                 /* we dont need to ourself ourselves */
569                 if (nodemap->nodes[j].pnn == pnn) {
570                         continue;
571                 }
572                 /* dont check nodes that are unavailable */
573                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
574                         continue;
575                 }
576
577                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
578                                          mem_ctx, &remote_dbmap);
579                 if (ret != 0) {
580                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
581                         return -1;
582                 }
583
584                 /* step through all databases on the remote node */
585                 for (db=0; db<remote_dbmap->num;db++) {
586                         const char *name;
587
588                         for (i=0;i<(*dbmap)->num;i++) {
589                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
590                                         break;
591                                 }
592                         }
593                         /* we already have this db locally */
594                         if (i!=(*dbmap)->num) {
595                                 continue;
596                         }
597                         /* ok so we need to create this database and
598                            rebuild dbmap
599                          */
600                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
601                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
602                         if (ret != 0) {
603                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
604                                           nodemap->nodes[j].pnn));
605                                 return -1;
606                         }
607                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
608                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
609                         if (ret != 0) {
610                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
611                                 return -1;
612                         }
613                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
614                         if (ret != 0) {
615                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
616                                 return -1;
617                         }
618                 }
619         }
620
621         return 0;
622 }
623
624
625 /*
626   pull the remote database contents from one node into the recdb
627  */
628 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
629                                     struct tdb_wrap *recdb, uint32_t dbid)
630 {
631         int ret;
632         TDB_DATA outdata;
633         struct ctdb_marshall_buffer *reply;
634         struct ctdb_rec_data *rec;
635         int i;
636         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
637
638         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
639                                CONTROL_TIMEOUT(), &outdata);
640         if (ret != 0) {
641                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
642                 talloc_free(tmp_ctx);
643                 return -1;
644         }
645
646         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
647
648         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
649                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
650                 talloc_free(tmp_ctx);
651                 return -1;
652         }
653         
654         rec = (struct ctdb_rec_data *)&reply->data[0];
655         
656         for (i=0;
657              i<reply->count;
658              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
659                 TDB_DATA key, data;
660                 struct ctdb_ltdb_header *hdr;
661                 TDB_DATA existing;
662                 
663                 key.dptr = &rec->data[0];
664                 key.dsize = rec->keylen;
665                 data.dptr = &rec->data[key.dsize];
666                 data.dsize = rec->datalen;
667                 
668                 hdr = (struct ctdb_ltdb_header *)data.dptr;
669
670                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
671                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
672                         talloc_free(tmp_ctx);
673                         return -1;
674                 }
675
676                 /* fetch the existing record, if any */
677                 existing = tdb_fetch(recdb->tdb, key);
678                 
679                 if (existing.dptr != NULL) {
680                         struct ctdb_ltdb_header header;
681                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
682                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
683                                          (unsigned)existing.dsize, srcnode));
684                                 free(existing.dptr);
685                                 talloc_free(tmp_ctx);
686                                 return -1;
687                         }
688                         header = *(struct ctdb_ltdb_header *)existing.dptr;
689                         free(existing.dptr);
690                         if (!(header.rsn < hdr->rsn ||
691                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
692                                 continue;
693                         }
694                 }
695                 
696                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
697                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
698                         talloc_free(tmp_ctx);
699                         return -1;                              
700                 }
701         }
702
703         talloc_free(tmp_ctx);
704
705         return 0;
706 }
707
708
709 struct pull_seqnum_cbdata {
710         int failed;
711         uint32_t pnn;
712         uint64_t seqnum;
713 };
714
715 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
716 {
717         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
718         uint64_t seqnum;
719
720         if (cb_data->failed != 0) {
721                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
722                 return;
723         }
724
725         if (res != 0) {
726                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
727                 cb_data->failed = 1;
728                 return;
729         }
730
731         if (outdata.dsize != sizeof(uint64_t)) {
732                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
733                 cb_data->failed = -1;
734                 return;
735         }
736
737         seqnum = *((uint64_t *)outdata.dptr);
738
739         if (seqnum > cb_data->seqnum) {
740                 cb_data->seqnum = seqnum;
741                 cb_data->pnn = node_pnn;
742         }
743 }
744
745 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
746 {
747         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
748
749         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
750         cb_data->failed = 1;
751 }
752
753 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
754                                 struct ctdb_recoverd *rec, 
755                                 struct ctdb_node_map *nodemap, 
756                                 struct tdb_wrap *recdb, uint32_t dbid)
757 {
758         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
759         uint32_t *nodes;
760         TDB_DATA data;
761         uint32_t outdata[2];
762         struct pull_seqnum_cbdata *cb_data;
763
764         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
765
766         outdata[0] = dbid;
767         outdata[1] = 0;
768
769         data.dsize = sizeof(outdata);
770         data.dptr  = (uint8_t *)&outdata[0];
771
772         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
773         if (cb_data == NULL) {
774                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
775                 talloc_free(tmp_ctx);
776                 return -1;
777         }
778
779         cb_data->failed = 0;
780         cb_data->pnn    = -1;
781         cb_data->seqnum = 0;
782         
783         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
784         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
785                                         nodes, 0,
786                                         CONTROL_TIMEOUT(), false, data,
787                                         pull_seqnum_cb,
788                                         pull_seqnum_fail_cb,
789                                         cb_data) != 0) {
790                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
791
792                 talloc_free(tmp_ctx);
793                 return -1;
794         }
795
796         if (cb_data->failed != 0) {
797                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
798                 talloc_free(tmp_ctx);
799                 return -1;
800         }
801
802         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
803                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
804                 talloc_free(tmp_ctx);
805                 return -1;
806         }
807
808         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
809
810         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
811                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
812                 talloc_free(tmp_ctx);
813                 return -1;
814         }
815
816         talloc_free(tmp_ctx);
817         return 0;
818 }
819
820
821 /*
822   pull all the remote database contents into the recdb
823  */
824 static int pull_remote_database(struct ctdb_context *ctdb,
825                                 struct ctdb_recoverd *rec, 
826                                 struct ctdb_node_map *nodemap, 
827                                 struct tdb_wrap *recdb, uint32_t dbid,
828                                 bool persistent)
829 {
830         int j;
831
832         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
833                 int ret;
834                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
835                 if (ret == 0) {
836                         return 0;
837                 }
838         }
839
840         /* pull all records from all other nodes across onto this node
841            (this merges based on rsn)
842         */
843         for (j=0; j<nodemap->num; j++) {
844                 /* dont merge from nodes that are unavailable */
845                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
846                         continue;
847                 }
848                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
849                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
850                                  nodemap->nodes[j].pnn));
851                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
852                         return -1;
853                 }
854         }
855         
856         return 0;
857 }
858
859
860 /*
861   update flags on all active nodes
862  */
863 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
864 {
865         int ret;
866
867         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
868                 if (ret != 0) {
869                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
870                 return -1;
871         }
872
873         return 0;
874 }
875
876 /*
877   ensure all nodes have the same vnnmap we do
878  */
879 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
880                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
881 {
882         int j, ret;
883
884         /* push the new vnn map out to all the nodes */
885         for (j=0; j<nodemap->num; j++) {
886                 /* dont push to nodes that are unavailable */
887                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
888                         continue;
889                 }
890
891                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
892                 if (ret != 0) {
893                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
894                         return -1;
895                 }
896         }
897
898         return 0;
899 }
900
901
902 struct vacuum_info {
903         struct vacuum_info *next, *prev;
904         struct ctdb_recoverd *rec;
905         uint32_t srcnode;
906         struct ctdb_db_context *ctdb_db;
907         struct ctdb_marshall_buffer *recs;
908         struct ctdb_rec_data *r;
909 };
910
911 static void vacuum_fetch_next(struct vacuum_info *v);
912
913 /*
914   called when a vacuum fetch has completed - just free it and do the next one
915  */
916 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
917 {
918         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
919         talloc_free(state);
920         vacuum_fetch_next(v);
921 }
922
923
924 /*
925   process the next element from the vacuum list
926 */
927 static void vacuum_fetch_next(struct vacuum_info *v)
928 {
929         struct ctdb_call call;
930         struct ctdb_rec_data *r;
931
932         while (v->recs->count) {
933                 struct ctdb_client_call_state *state;
934                 TDB_DATA data;
935                 struct ctdb_ltdb_header *hdr;
936
937                 ZERO_STRUCT(call);
938                 call.call_id = CTDB_NULL_FUNC;
939                 call.flags = CTDB_IMMEDIATE_MIGRATION;
940                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
941
942                 r = v->r;
943                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
944                 v->recs->count--;
945
946                 call.key.dptr = &r->data[0];
947                 call.key.dsize = r->keylen;
948
949                 /* ensure we don't block this daemon - just skip a record if we can't get
950                    the chainlock */
951                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
952                         continue;
953                 }
954
955                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
956                 if (data.dptr == NULL) {
957                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
958                         continue;
959                 }
960
961                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
962                         free(data.dptr);
963                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
964                         continue;
965                 }
966                 
967                 hdr = (struct ctdb_ltdb_header *)data.dptr;
968                 if (hdr->dmaster == v->rec->ctdb->pnn) {
969                         /* its already local */
970                         free(data.dptr);
971                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
972                         continue;
973                 }
974
975                 free(data.dptr);
976
977                 state = ctdb_call_send(v->ctdb_db, &call);
978                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
979                 if (state == NULL) {
980                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
981                         talloc_free(v);
982                         return;
983                 }
984                 state->async.fn = vacuum_fetch_callback;
985                 state->async.private_data = v;
986                 return;
987         }
988
989         talloc_free(v);
990 }
991
992
993 /*
994   destroy a vacuum info structure
995  */
996 static int vacuum_info_destructor(struct vacuum_info *v)
997 {
998         DLIST_REMOVE(v->rec->vacuum_info, v);
999         return 0;
1000 }
1001
1002
1003 /*
1004   handler for vacuum fetch
1005 */
1006 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1007                                  TDB_DATA data, void *private_data)
1008 {
1009         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1010         struct ctdb_marshall_buffer *recs;
1011         int ret, i;
1012         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1013         const char *name;
1014         struct ctdb_dbid_map *dbmap=NULL;
1015         bool persistent = false;
1016         struct ctdb_db_context *ctdb_db;
1017         struct ctdb_rec_data *r;
1018         uint32_t srcnode;
1019         struct vacuum_info *v;
1020
1021         recs = (struct ctdb_marshall_buffer *)data.dptr;
1022         r = (struct ctdb_rec_data *)&recs->data[0];
1023
1024         if (recs->count == 0) {
1025                 talloc_free(tmp_ctx);
1026                 return;
1027         }
1028
1029         srcnode = r->reqid;
1030
1031         for (v=rec->vacuum_info;v;v=v->next) {
1032                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1033                         /* we're already working on records from this node */
1034                         talloc_free(tmp_ctx);
1035                         return;
1036                 }
1037         }
1038
1039         /* work out if the database is persistent */
1040         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1041         if (ret != 0) {
1042                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1043                 talloc_free(tmp_ctx);
1044                 return;
1045         }
1046
1047         for (i=0;i<dbmap->num;i++) {
1048                 if (dbmap->dbs[i].dbid == recs->db_id) {
1049                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1050                         break;
1051                 }
1052         }
1053         if (i == dbmap->num) {
1054                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1055                 talloc_free(tmp_ctx);
1056                 return;         
1057         }
1058
1059         /* find the name of this database */
1060         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1062                 talloc_free(tmp_ctx);
1063                 return;
1064         }
1065
1066         /* attach to it */
1067         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1068         if (ctdb_db == NULL) {
1069                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1070                 talloc_free(tmp_ctx);
1071                 return;
1072         }
1073
1074         v = talloc_zero(rec, struct vacuum_info);
1075         if (v == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1077                 talloc_free(tmp_ctx);
1078                 return;
1079         }
1080
1081         v->rec = rec;
1082         v->srcnode = srcnode;
1083         v->ctdb_db = ctdb_db;
1084         v->recs = talloc_memdup(v, recs, data.dsize);
1085         if (v->recs == NULL) {
1086                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1087                 talloc_free(v);
1088                 talloc_free(tmp_ctx);
1089                 return;         
1090         }
1091         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1092
1093         DLIST_ADD(rec->vacuum_info, v);
1094
1095         talloc_set_destructor(v, vacuum_info_destructor);
1096
1097         vacuum_fetch_next(v);
1098         talloc_free(tmp_ctx);
1099 }
1100
1101
1102 /*
1103   called when ctdb_wait_timeout should finish
1104  */
1105 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1106                               struct timeval yt, void *p)
1107 {
1108         uint32_t *timed_out = (uint32_t *)p;
1109         (*timed_out) = 1;
1110 }
1111
1112 /*
1113   wait for a given number of seconds
1114  */
1115 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1116 {
1117         uint32_t timed_out = 0;
1118         time_t usecs = (secs - (time_t)secs) * 1000000;
1119         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1120         while (!timed_out) {
1121                 event_loop_once(ctdb->ev);
1122         }
1123 }
1124
1125 /*
1126   called when an election times out (ends)
1127  */
1128 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1129                                   struct timeval t, void *p)
1130 {
1131         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1132         rec->election_timeout = NULL;
1133         fast_start = false;
1134
1135         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1136 }
1137
1138
1139 /*
1140   wait for an election to finish. It finished election_timeout seconds after
1141   the last election packet is received
1142  */
1143 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1144 {
1145         struct ctdb_context *ctdb = rec->ctdb;
1146         while (rec->election_timeout) {
1147                 event_loop_once(ctdb->ev);
1148         }
1149 }
1150
1151 /*
1152   Update our local flags from all remote connected nodes. 
1153   This is only run when we are or we belive we are the recovery master
1154  */
1155 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1156 {
1157         int j;
1158         struct ctdb_context *ctdb = rec->ctdb;
1159         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1160
1161         /* get the nodemap for all active remote nodes and verify
1162            they are the same as for this node
1163          */
1164         for (j=0; j<nodemap->num; j++) {
1165                 struct ctdb_node_map *remote_nodemap=NULL;
1166                 int ret;
1167
1168                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1169                         continue;
1170                 }
1171                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1172                         continue;
1173                 }
1174
1175                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1176                                            mem_ctx, &remote_nodemap);
1177                 if (ret != 0) {
1178                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1179                                   nodemap->nodes[j].pnn));
1180                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1181                         talloc_free(mem_ctx);
1182                         return MONITOR_FAILED;
1183                 }
1184                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1185                         /* We should tell our daemon about this so it
1186                            updates its flags or else we will log the same 
1187                            message again in the next iteration of recovery.
1188                            Since we are the recovery master we can just as
1189                            well update the flags on all nodes.
1190                         */
1191                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1192                         if (ret != 0) {
1193                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1194                                 return -1;
1195                         }
1196
1197                         /* Update our local copy of the flags in the recovery
1198                            daemon.
1199                         */
1200                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1201                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1202                                  nodemap->nodes[j].flags));
1203                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1204                 }
1205                 talloc_free(remote_nodemap);
1206         }
1207         talloc_free(mem_ctx);
1208         return MONITOR_OK;
1209 }
1210
1211
1212 /* Create a new random generation ip. 
1213    The generation id can not be the INVALID_GENERATION id
1214 */
1215 static uint32_t new_generation(void)
1216 {
1217         uint32_t generation;
1218
1219         while (1) {
1220                 generation = random();
1221
1222                 if (generation != INVALID_GENERATION) {
1223                         break;
1224                 }
1225         }
1226
1227         return generation;
1228 }
1229
1230
1231 /*
1232   create a temporary working database
1233  */
1234 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1235 {
1236         char *name;
1237         struct tdb_wrap *recdb;
1238         unsigned tdb_flags;
1239
1240         /* open up the temporary recovery database */
1241         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1242                                ctdb->db_directory_state,
1243                                ctdb->pnn);
1244         if (name == NULL) {
1245                 return NULL;
1246         }
1247         unlink(name);
1248
1249         tdb_flags = TDB_NOLOCK;
1250         if (ctdb->valgrinding) {
1251                 tdb_flags |= TDB_NOMMAP;
1252         }
1253         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1254
1255         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1256                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1257         if (recdb == NULL) {
1258                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1259         }
1260
1261         talloc_free(name);
1262
1263         return recdb;
1264 }
1265
1266
1267 /* 
1268    a traverse function for pulling all relevant records from recdb
1269  */
1270 struct recdb_data {
1271         struct ctdb_context *ctdb;
1272         struct ctdb_marshall_buffer *recdata;
1273         uint32_t len;
1274         uint32_t allocated_len;
1275         bool failed;
1276         bool persistent;
1277 };
1278
1279 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1280 {
1281         struct recdb_data *params = (struct recdb_data *)p;
1282         struct ctdb_rec_data *rec;
1283         struct ctdb_ltdb_header *hdr;
1284
1285         /*
1286          * skip empty records - but NOT for persistent databases:
1287          *
1288          * The record-by-record mode of recovery deletes empty records.
1289          * For persistent databases, this can lead to data corruption
1290          * by deleting records that should be there:
1291          *
1292          * - Assume the cluster has been running for a while.
1293          *
1294          * - A record R in a persistent database has been created and
1295          *   deleted a couple of times, the last operation being deletion,
1296          *   leaving an empty record with a high RSN, say 10.
1297          *
1298          * - Now a node N is turned off.
1299          *
1300          * - This leaves the local database copy of D on N with the empty
1301          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1302          *   the copy of record R.
1303          *
1304          * - Now the record is created again while node N is turned off.
1305          *   This creates R with RSN = 1 on all nodes except for N.
1306          *
1307          * - Now node N is turned on again. The following recovery will chose
1308          *   the older empty copy of R due to RSN 10 > RSN 1.
1309          *
1310          * ==> Hence the record is gone after the recovery.
1311          *
1312          * On databases like Samba's registry, this can damage the higher-level
1313          * data structures built from the various tdb-level records.
1314          */
1315         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1316                 return 0;
1317         }
1318
1319         /* update the dmaster field to point to us */
1320         hdr = (struct ctdb_ltdb_header *)data.dptr;
1321         if (!params->persistent) {
1322                 hdr->dmaster = params->ctdb->pnn;
1323                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1324         }
1325
1326         /* add the record to the blob ready to send to the nodes */
1327         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1328         if (rec == NULL) {
1329                 params->failed = true;
1330                 return -1;
1331         }
1332         if (params->len + rec->length >= params->allocated_len) {
1333                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1334                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1335         }
1336         if (params->recdata == NULL) {
1337                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1338                          rec->length + params->len));
1339                 params->failed = true;
1340                 return -1;
1341         }
1342         params->recdata->count++;
1343         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1344         params->len += rec->length;
1345         talloc_free(rec);
1346
1347         return 0;
1348 }
1349
1350 /*
1351   push the recdb database out to all nodes
1352  */
1353 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1354                                bool persistent,
1355                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1356 {
1357         struct recdb_data params;
1358         struct ctdb_marshall_buffer *recdata;
1359         TDB_DATA outdata;
1360         TALLOC_CTX *tmp_ctx;
1361         uint32_t *nodes;
1362
1363         tmp_ctx = talloc_new(ctdb);
1364         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1365
1366         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1367         CTDB_NO_MEMORY(ctdb, recdata);
1368
1369         recdata->db_id = dbid;
1370
1371         params.ctdb = ctdb;
1372         params.recdata = recdata;
1373         params.len = offsetof(struct ctdb_marshall_buffer, data);
1374         params.allocated_len = params.len;
1375         params.failed = false;
1376         params.persistent = persistent;
1377
1378         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1379                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1380                 talloc_free(params.recdata);
1381                 talloc_free(tmp_ctx);
1382                 return -1;
1383         }
1384
1385         if (params.failed) {
1386                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1387                 talloc_free(params.recdata);
1388                 talloc_free(tmp_ctx);
1389                 return -1;              
1390         }
1391
1392         recdata = params.recdata;
1393
1394         outdata.dptr = (void *)recdata;
1395         outdata.dsize = params.len;
1396
1397         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1398         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1399                                         nodes, 0,
1400                                         CONTROL_TIMEOUT(), false, outdata,
1401                                         NULL, NULL,
1402                                         NULL) != 0) {
1403                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1404                 talloc_free(recdata);
1405                 talloc_free(tmp_ctx);
1406                 return -1;
1407         }
1408
1409         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1410                   dbid, recdata->count));
1411
1412         talloc_free(recdata);
1413         talloc_free(tmp_ctx);
1414
1415         return 0;
1416 }
1417
1418
1419 /*
1420   go through a full recovery on one database 
1421  */
1422 static int recover_database(struct ctdb_recoverd *rec, 
1423                             TALLOC_CTX *mem_ctx,
1424                             uint32_t dbid,
1425                             bool persistent,
1426                             uint32_t pnn, 
1427                             struct ctdb_node_map *nodemap,
1428                             uint32_t transaction_id)
1429 {
1430         struct tdb_wrap *recdb;
1431         int ret;
1432         struct ctdb_context *ctdb = rec->ctdb;
1433         TDB_DATA data;
1434         struct ctdb_control_wipe_database w;
1435         uint32_t *nodes;
1436
1437         recdb = create_recdb(ctdb, mem_ctx);
1438         if (recdb == NULL) {
1439                 return -1;
1440         }
1441
1442         /* pull all remote databases onto the recdb */
1443         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1446                 return -1;
1447         }
1448
1449         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1450
1451         /* wipe all the remote databases. This is safe as we are in a transaction */
1452         w.db_id = dbid;
1453         w.transaction_id = transaction_id;
1454
1455         data.dptr = (void *)&w;
1456         data.dsize = sizeof(w);
1457
1458         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1459         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1460                                         nodes, 0,
1461                                         CONTROL_TIMEOUT(), false, data,
1462                                         NULL, NULL,
1463                                         NULL) != 0) {
1464                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1465                 talloc_free(recdb);
1466                 return -1;
1467         }
1468         
1469         /* push out the correct database. This sets the dmaster and skips 
1470            the empty records */
1471         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1472         if (ret != 0) {
1473                 talloc_free(recdb);
1474                 return -1;
1475         }
1476
1477         /* all done with this database */
1478         talloc_free(recdb);
1479
1480         return 0;
1481 }
1482
1483 /*
1484   reload the nodes file 
1485 */
1486 static void reload_nodes_file(struct ctdb_context *ctdb)
1487 {
1488         ctdb->nodes = NULL;
1489         ctdb_load_nodes_file(ctdb);
1490 }
1491
1492 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1493                                          struct ctdb_recoverd *rec,
1494                                          struct ctdb_node_map *nodemap,
1495                                          uint32_t *culprit)
1496 {
1497         int j;
1498         int ret;
1499
1500         if (ctdb->num_nodes != nodemap->num) {
1501                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1502                                   ctdb->num_nodes, nodemap->num));
1503                 if (culprit) {
1504                         *culprit = ctdb->pnn;
1505                 }
1506                 return -1;
1507         }
1508
1509         for (j=0; j<nodemap->num; j++) {
1510                 /* For readability */
1511                 struct ctdb_node *node = ctdb->nodes[j];
1512
1513                 /* release any existing data */
1514                 if (node->known_public_ips) {
1515                         talloc_free(node->known_public_ips);
1516                         node->known_public_ips = NULL;
1517                 }
1518                 if (node->available_public_ips) {
1519                         talloc_free(node->available_public_ips);
1520                         node->available_public_ips = NULL;
1521                 }
1522
1523                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1524                         continue;
1525                 }
1526
1527                 /* Retrieve the list of known public IPs from the node */
1528                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1529                                         CONTROL_TIMEOUT(),
1530                                         node->pnn,
1531                                         ctdb->nodes,
1532                                         0,
1533                                         &node->known_public_ips);
1534                 if (ret != 0) {
1535                         DEBUG(DEBUG_ERR,
1536                               ("Failed to read known public IPs from node: %u\n",
1537                                node->pnn));
1538                         if (culprit) {
1539                                 *culprit = node->pnn;
1540                         }
1541                         return -1;
1542                 }
1543
1544                 if (ctdb->do_checkpublicip &&
1545                     rec->takeover_runs_disable_ctx == NULL &&
1546                     verify_remote_ip_allocation(ctdb,
1547                                                  node->known_public_ips,
1548                                                  node->pnn)) {
1549                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1550                         rec->need_takeover_run = true;
1551                 }
1552
1553                 /* Retrieve the list of available public IPs from the node */
1554                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1555                                         CONTROL_TIMEOUT(),
1556                                         node->pnn,
1557                                         ctdb->nodes,
1558                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1559                                         &node->available_public_ips);
1560                 if (ret != 0) {
1561                         DEBUG(DEBUG_ERR,
1562                               ("Failed to read available public IPs from node: %u\n",
1563                                node->pnn));
1564                         if (culprit) {
1565                                 *culprit = node->pnn;
1566                         }
1567                         return -1;
1568                 }
1569         }
1570
1571         return 0;
1572 }
1573
1574 /* when we start a recovery, make sure all nodes use the same reclock file
1575    setting
1576 */
1577 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1578 {
1579         struct ctdb_context *ctdb = rec->ctdb;
1580         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1581         TDB_DATA data;
1582         uint32_t *nodes;
1583
1584         if (ctdb->recovery_lock_file == NULL) {
1585                 data.dptr  = NULL;
1586                 data.dsize = 0;
1587         } else {
1588                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1589                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1590         }
1591
1592         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1593         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1594                                         nodes, 0,
1595                                         CONTROL_TIMEOUT(),
1596                                         false, data,
1597                                         NULL, NULL,
1598                                         rec) != 0) {
1599                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1600                 talloc_free(tmp_ctx);
1601                 return -1;
1602         }
1603
1604         talloc_free(tmp_ctx);
1605         return 0;
1606 }
1607
1608
1609 /*
1610  * this callback is called for every node that failed to execute ctdb_takeover_run()
1611  * and set flag to re-run takeover run.
1612  */
1613 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1614 {
1615         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1616
1617         if (callback_data != NULL) {
1618                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1619
1620                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1621
1622                 ctdb_set_culprit(rec, node_pnn);
1623         }
1624 }
1625
1626
1627 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1628 {
1629         struct ctdb_context *ctdb = rec->ctdb;
1630         int i;
1631         struct ctdb_banning_state *ban_state;
1632
1633         *self_ban = false;
1634         for (i=0; i<ctdb->num_nodes; i++) {
1635                 if (ctdb->nodes[i]->ban_state == NULL) {
1636                         continue;
1637                 }
1638                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1639                 if (ban_state->count < 2*ctdb->num_nodes) {
1640                         continue;
1641                 }
1642
1643                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1644                         ctdb->nodes[i]->pnn, ban_state->count,
1645                         ctdb->tunable.recovery_ban_period));
1646                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1647                 ban_state->count = 0;
1648
1649                 /* Banning ourself? */
1650                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1651                         *self_ban = true;
1652                 }
1653         }
1654 }
1655
1656 static bool do_takeover_run(struct ctdb_recoverd *rec,
1657                             struct ctdb_node_map *nodemap,
1658                             bool banning_credits_on_fail)
1659 {
1660         uint32_t *nodes = NULL;
1661         struct srvid_request dtr;
1662         TDB_DATA data;
1663         int i;
1664         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1665         int ret;
1666         bool ok;
1667
1668         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1669
1670         if (rec->takeover_run_in_progress) {
1671                 DEBUG(DEBUG_ERR, (__location__
1672                                   " takeover run already in progress \n"));
1673                 ok = false;
1674                 goto done;
1675         }
1676
1677         rec->takeover_run_in_progress = true;
1678
1679         /* If takeover runs are in disabled then fail... */
1680         if (rec->takeover_runs_disable_ctx != NULL) {
1681                 DEBUG(DEBUG_ERR,
1682                       ("Takeover runs are disabled so refusing to run one\n"));
1683                 ok = false;
1684                 goto done;
1685         }
1686
1687         /* Disable IP checks (takeover runs, really) on other nodes
1688          * while doing this takeover run.  This will stop those other
1689          * nodes from triggering takeover runs when think they should
1690          * be hosting an IP but it isn't yet on an interface.  Don't
1691          * wait for replies since a failure here might cause some
1692          * noise in the logs but will not actually cause a problem.
1693          */
1694         dtr.srvid = 0; /* No reply */
1695         dtr.pnn = -1;
1696
1697         data.dptr  = (uint8_t*)&dtr;
1698         data.dsize = sizeof(dtr);
1699
1700         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1701
1702         /* Disable for 60 seconds.  This can be a tunable later if
1703          * necessary.
1704          */
1705         dtr.data = 60;
1706         for (i = 0; i < talloc_array_length(nodes); i++) {
1707                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1708                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1709                                              data) != 0) {
1710                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1711                 }
1712         }
1713
1714         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1715                                 rec->force_rebalance_nodes,
1716                                 takeover_fail_callback,
1717                                 banning_credits_on_fail ? rec : NULL);
1718
1719         /* Reenable takeover runs and IP checks on other nodes */
1720         dtr.data = 0;
1721         for (i = 0; i < talloc_array_length(nodes); i++) {
1722                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1723                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1724                                              data) != 0) {
1725                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1726                 }
1727         }
1728
1729         if (ret != 0) {
1730                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1731                 ok = false;
1732                 goto done;
1733         }
1734
1735         ok = true;
1736         /* Takeover run was successful so clear force rebalance targets */
1737         if (rebalance_nodes == rec->force_rebalance_nodes) {
1738                 TALLOC_FREE(rec->force_rebalance_nodes);
1739         } else {
1740                 DEBUG(DEBUG_WARNING,
1741                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1742         }
1743 done:
1744         rec->need_takeover_run = !ok;
1745         talloc_free(nodes);
1746         rec->takeover_run_in_progress = false;
1747
1748         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1749         return ok;
1750 }
1751
1752
1753 /*
1754   we are the recmaster, and recovery is needed - start a recovery run
1755  */
1756 static int do_recovery(struct ctdb_recoverd *rec, 
1757                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1758                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1759 {
1760         struct ctdb_context *ctdb = rec->ctdb;
1761         int i, j, ret;
1762         uint32_t generation;
1763         struct ctdb_dbid_map *dbmap;
1764         TDB_DATA data;
1765         uint32_t *nodes;
1766         struct timeval start_time;
1767         uint32_t culprit = (uint32_t)-1;
1768         bool self_ban;
1769
1770         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1771
1772         /* if recovery fails, force it again */
1773         rec->need_recovery = true;
1774
1775         ban_misbehaving_nodes(rec, &self_ban);
1776         if (self_ban) {
1777                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1778                 return -1;
1779         }
1780
1781         if (ctdb->tunable.verify_recovery_lock != 0) {
1782                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1783                 start_time = timeval_current();
1784                 if (!ctdb_recovery_lock(ctdb, true)) {
1785                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1786                                          "and ban ourself for %u seconds\n",
1787                                          ctdb->tunable.recovery_ban_period));
1788                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1789                         return -1;
1790                 }
1791                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1792                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1793         }
1794
1795         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1796
1797         /* get a list of all databases */
1798         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1799         if (ret != 0) {
1800                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1801                 return -1;
1802         }
1803
1804         /* we do the db creation before we set the recovery mode, so the freeze happens
1805            on all databases we will be dealing with. */
1806
1807         /* verify that we have all the databases any other node has */
1808         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1809         if (ret != 0) {
1810                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1811                 return -1;
1812         }
1813
1814         /* verify that all other nodes have all our databases */
1815         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1816         if (ret != 0) {
1817                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1818                 return -1;
1819         }
1820         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1821
1822         /* update the database priority for all remote databases */
1823         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1824         if (ret != 0) {
1825                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1826         }
1827         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1828
1829
1830         /* update all other nodes to use the same setting for reclock files
1831            as the local recovery master.
1832         */
1833         sync_recovery_lock_file_across_cluster(rec);
1834
1835         /* set recovery mode to active on all nodes */
1836         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1837         if (ret != 0) {
1838                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1839                 return -1;
1840         }
1841
1842         /* execute the "startrecovery" event script on all nodes */
1843         ret = run_startrecovery_eventscript(rec, nodemap);
1844         if (ret!=0) {
1845                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1846                 return -1;
1847         }
1848
1849         /*
1850           update all nodes to have the same flags that we have
1851          */
1852         for (i=0;i<nodemap->num;i++) {
1853                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1854                         continue;
1855                 }
1856
1857                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1858                 if (ret != 0) {
1859                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1860                         return -1;
1861                 }
1862         }
1863
1864         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1865
1866         /* pick a new generation number */
1867         generation = new_generation();
1868
1869         /* change the vnnmap on this node to use the new generation 
1870            number but not on any other nodes.
1871            this guarantees that if we abort the recovery prematurely
1872            for some reason (a node stops responding?)
1873            that we can just return immediately and we will reenter
1874            recovery shortly again.
1875            I.e. we deliberately leave the cluster with an inconsistent
1876            generation id to allow us to abort recovery at any stage and
1877            just restart it from scratch.
1878          */
1879         vnnmap->generation = generation;
1880         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1881         if (ret != 0) {
1882                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1883                 return -1;
1884         }
1885
1886         data.dptr = (void *)&generation;
1887         data.dsize = sizeof(uint32_t);
1888
1889         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1890         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1891                                         nodes, 0,
1892                                         CONTROL_TIMEOUT(), false, data,
1893                                         NULL,
1894                                         transaction_start_fail_callback,
1895                                         rec) != 0) {
1896                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1897                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1898                                         nodes, 0,
1899                                         CONTROL_TIMEOUT(), false, tdb_null,
1900                                         NULL,
1901                                         NULL,
1902                                         NULL) != 0) {
1903                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1904                 }
1905                 return -1;
1906         }
1907
1908         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1909
1910         for (i=0;i<dbmap->num;i++) {
1911                 ret = recover_database(rec, mem_ctx,
1912                                        dbmap->dbs[i].dbid,
1913                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1914                                        pnn, nodemap, generation);
1915                 if (ret != 0) {
1916                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1917                         return -1;
1918                 }
1919         }
1920
1921         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1922
1923         /* commit all the changes */
1924         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1925                                         nodes, 0,
1926                                         CONTROL_TIMEOUT(), false, data,
1927                                         NULL, NULL,
1928                                         NULL) != 0) {
1929                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1930                 return -1;
1931         }
1932
1933         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1934         
1935
1936         /* update the capabilities for all nodes */
1937         ret = update_capabilities(ctdb, nodemap);
1938         if (ret!=0) {
1939                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1940                 return -1;
1941         }
1942
1943         /* build a new vnn map with all the currently active and
1944            unbanned nodes */
1945         generation = new_generation();
1946         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1947         CTDB_NO_MEMORY(ctdb, vnnmap);
1948         vnnmap->generation = generation;
1949         vnnmap->size = 0;
1950         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1951         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1952         for (i=j=0;i<nodemap->num;i++) {
1953                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1954                         continue;
1955                 }
1956                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1957                         /* this node can not be an lmaster */
1958                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1959                         continue;
1960                 }
1961
1962                 vnnmap->size++;
1963                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1964                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1965                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1966
1967         }
1968         if (vnnmap->size == 0) {
1969                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1970                 vnnmap->size++;
1971                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1972                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1973                 vnnmap->map[0] = pnn;
1974         }       
1975
1976         /* update to the new vnnmap on all nodes */
1977         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1978         if (ret != 0) {
1979                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1980                 return -1;
1981         }
1982
1983         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1984
1985         /* update recmaster to point to us for all nodes */
1986         ret = set_recovery_master(ctdb, nodemap, pnn);
1987         if (ret!=0) {
1988                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1989                 return -1;
1990         }
1991
1992         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1993
1994         /*
1995           update all nodes to have the same flags that we have
1996          */
1997         for (i=0;i<nodemap->num;i++) {
1998                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1999                         continue;
2000                 }
2001
2002                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2003                 if (ret != 0) {
2004                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2005                         return -1;
2006                 }
2007         }
2008
2009         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2010
2011         /* disable recovery mode */
2012         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2013         if (ret != 0) {
2014                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2015                 return -1;
2016         }
2017
2018         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2019
2020         /* Fetch known/available public IPs from each active node */
2021         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2022         if (ret != 0) {
2023                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2024                                  culprit));
2025                 rec->need_takeover_run = true;
2026                 return -1;
2027         }
2028
2029         do_takeover_run(rec, nodemap, false);
2030
2031         /* execute the "recovered" event script on all nodes */
2032         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2033         if (ret!=0) {
2034                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2035                 return -1;
2036         }
2037
2038         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2039
2040         /* send a message to all clients telling them that the cluster 
2041            has been reconfigured */
2042         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
2043
2044         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2045
2046         rec->need_recovery = false;
2047
2048         /* we managed to complete a full recovery, make sure to forgive
2049            any past sins by the nodes that could now participate in the
2050            recovery.
2051         */
2052         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2053         for (i=0;i<nodemap->num;i++) {
2054                 struct ctdb_banning_state *ban_state;
2055
2056                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2057                         continue;
2058                 }
2059
2060                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2061                 if (ban_state == NULL) {
2062                         continue;
2063                 }
2064
2065                 ban_state->count = 0;
2066         }
2067
2068
2069         /* We just finished a recovery successfully. 
2070            We now wait for rerecovery_timeout before we allow 
2071            another recovery to take place.
2072         */
2073         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2074         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2075         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2076
2077         return 0;
2078 }
2079
2080
2081 /*
2082   elections are won by first checking the number of connected nodes, then
2083   the priority time, then the pnn
2084  */
2085 struct election_message {
2086         uint32_t num_connected;
2087         struct timeval priority_time;
2088         uint32_t pnn;
2089         uint32_t node_flags;
2090 };
2091
2092 /*
2093   form this nodes election data
2094  */
2095 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2096 {
2097         int ret, i;
2098         struct ctdb_node_map *nodemap;
2099         struct ctdb_context *ctdb = rec->ctdb;
2100
2101         ZERO_STRUCTP(em);
2102
2103         em->pnn = rec->ctdb->pnn;
2104         em->priority_time = rec->priority_time;
2105
2106         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2107         if (ret != 0) {
2108                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
2109                 return;
2110         }
2111
2112         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2113         em->node_flags = rec->node_flags;
2114
2115         for (i=0;i<nodemap->num;i++) {
2116                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2117                         em->num_connected++;
2118                 }
2119         }
2120
2121         /* we shouldnt try to win this election if we cant be a recmaster */
2122         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2123                 em->num_connected = 0;
2124                 em->priority_time = timeval_current();
2125         }
2126
2127         talloc_free(nodemap);
2128 }
2129
2130 /*
2131   see if the given election data wins
2132  */
2133 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2134 {
2135         struct election_message myem;
2136         int cmp = 0;
2137
2138         ctdb_election_data(rec, &myem);
2139
2140         /* we cant win if we dont have the recmaster capability */
2141         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2142                 return false;
2143         }
2144
2145         /* we cant win if we are banned */
2146         if (rec->node_flags & NODE_FLAGS_BANNED) {
2147                 return false;
2148         }
2149
2150         /* we cant win if we are stopped */
2151         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2152                 return false;
2153         }
2154
2155         /* we will automatically win if the other node is banned */
2156         if (em->node_flags & NODE_FLAGS_BANNED) {
2157                 return true;
2158         }
2159
2160         /* we will automatically win if the other node is banned */
2161         if (em->node_flags & NODE_FLAGS_STOPPED) {
2162                 return true;
2163         }
2164
2165         /* try to use the most connected node */
2166         if (cmp == 0) {
2167                 cmp = (int)myem.num_connected - (int)em->num_connected;
2168         }
2169
2170         /* then the longest running node */
2171         if (cmp == 0) {
2172                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2173         }
2174
2175         if (cmp == 0) {
2176                 cmp = (int)myem.pnn - (int)em->pnn;
2177         }
2178
2179         return cmp > 0;
2180 }
2181
2182 /*
2183   send out an election request
2184  */
2185 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
2186 {
2187         int ret;
2188         TDB_DATA election_data;
2189         struct election_message emsg;
2190         uint64_t srvid;
2191         struct ctdb_context *ctdb = rec->ctdb;
2192
2193         srvid = CTDB_SRVID_RECOVERY;
2194
2195         ctdb_election_data(rec, &emsg);
2196
2197         election_data.dsize = sizeof(struct election_message);
2198         election_data.dptr  = (unsigned char *)&emsg;
2199
2200
2201         /* send an election message to all active nodes */
2202         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2203         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2204
2205
2206         /* A new node that is already frozen has entered the cluster.
2207            The existing nodes are not frozen and dont need to be frozen
2208            until the election has ended and we start the actual recovery
2209         */
2210         if (update_recmaster == true) {
2211                 /* first we assume we will win the election and set 
2212                    recoverymaster to be ourself on the current node
2213                  */
2214                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2215                 if (ret != 0) {
2216                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2217                         return -1;
2218                 }
2219         }
2220
2221
2222         return 0;
2223 }
2224
2225 /*
2226   this function will unban all nodes in the cluster
2227 */
2228 static void unban_all_nodes(struct ctdb_context *ctdb)
2229 {
2230         int ret, i;
2231         struct ctdb_node_map *nodemap;
2232         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2233         
2234         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2235         if (ret != 0) {
2236                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2237                 return;
2238         }
2239
2240         for (i=0;i<nodemap->num;i++) {
2241                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2242                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2243                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2244                 }
2245         }
2246
2247         talloc_free(tmp_ctx);
2248 }
2249
2250
2251 /*
2252   we think we are winning the election - send a broadcast election request
2253  */
2254 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2255 {
2256         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2257         int ret;
2258
2259         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2260         if (ret != 0) {
2261                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2262         }
2263
2264         talloc_free(rec->send_election_te);
2265         rec->send_election_te = NULL;
2266 }
2267
2268 /*
2269   handler for memory dumps
2270 */
2271 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2272                              TDB_DATA data, void *private_data)
2273 {
2274         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2275         TDB_DATA *dump;
2276         int ret;
2277         struct srvid_request *rd;
2278
2279         if (data.dsize != sizeof(struct srvid_request)) {
2280                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2281                 talloc_free(tmp_ctx);
2282                 return;
2283         }
2284         rd = (struct srvid_request *)data.dptr;
2285
2286         dump = talloc_zero(tmp_ctx, TDB_DATA);
2287         if (dump == NULL) {
2288                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2289                 talloc_free(tmp_ctx);
2290                 return;
2291         }
2292         ret = ctdb_dump_memory(ctdb, dump);
2293         if (ret != 0) {
2294                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2295                 talloc_free(tmp_ctx);
2296                 return;
2297         }
2298
2299 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2300
2301         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2302         if (ret != 0) {
2303                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2304                 talloc_free(tmp_ctx);
2305                 return;
2306         }
2307
2308         talloc_free(tmp_ctx);
2309 }
2310
2311 /*
2312   handler for getlog
2313 */
2314 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2315                            TDB_DATA data, void *private_data)
2316 {
2317         struct ctdb_get_log_addr *log_addr;
2318         pid_t child;
2319
2320         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2321                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2322                 return;
2323         }
2324         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2325
2326         child = ctdb_fork_no_free_ringbuffer(ctdb);
2327         if (child == (pid_t)-1) {
2328                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2329                 return;
2330         }
2331
2332         if (child == 0) {
2333                 ctdb_set_process_name("ctdb_rec_log_collector");
2334                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2335                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2336                         _exit(1);
2337                 }
2338                 ctdb_collect_log(ctdb, log_addr);
2339                 _exit(0);
2340         }
2341 }
2342
2343 /*
2344   handler for clearlog
2345 */
2346 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2347                              TDB_DATA data, void *private_data)
2348 {
2349         ctdb_clear_log(ctdb);
2350 }
2351
2352 /*
2353   handler for reload_nodes
2354 */
2355 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2356                              TDB_DATA data, void *private_data)
2357 {
2358         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2359
2360         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2361
2362         reload_nodes_file(rec->ctdb);
2363 }
2364
2365
2366 static void ctdb_rebalance_timeout(struct event_context *ev,
2367                                    struct timed_event *te,
2368                                    struct timeval t, void *p)
2369 {
2370         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2371
2372         if (rec->force_rebalance_nodes == NULL) {
2373                 DEBUG(DEBUG_ERR,
2374                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2375                 return;
2376         }
2377
2378         DEBUG(DEBUG_NOTICE,
2379               ("Rebalance timeout occurred - do takeover run\n"));
2380         do_takeover_run(rec, rec->nodemap, false);
2381 }
2382
2383         
2384 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2385                                         uint64_t srvid,
2386                                         TDB_DATA data, void *private_data)
2387 {
2388         uint32_t pnn;
2389         uint32_t *t;
2390         int len;
2391         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2392
2393         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2394                 return;
2395         }
2396
2397         if (data.dsize != sizeof(uint32_t)) {
2398                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2399                 return;
2400         }
2401
2402         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2403                 return;
2404         }
2405
2406         pnn = *(uint32_t *)&data.dptr[0];
2407
2408         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2409
2410         /* Copy any existing list of nodes.  There's probably some
2411          * sort of realloc variant that will do this but we need to
2412          * make sure that freeing the old array also cancels the timer
2413          * event for the timeout... not sure if realloc will do that.
2414          */
2415         len = (rec->force_rebalance_nodes != NULL) ?
2416                 talloc_array_length(rec->force_rebalance_nodes) :
2417                 0;
2418
2419         /* This allows duplicates to be added but they don't cause
2420          * harm.  A call to add a duplicate PNN arguably means that
2421          * the timeout should be reset, so this is the simplest
2422          * solution.
2423          */
2424         t = talloc_zero_array(rec, uint32_t, len+1);
2425         CTDB_NO_MEMORY_VOID(ctdb, t);
2426         if (len > 0) {
2427                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2428         }
2429         t[len] = pnn;
2430
2431         talloc_free(rec->force_rebalance_nodes);
2432
2433         rec->force_rebalance_nodes = t;
2434         event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2435                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2436                         ctdb_rebalance_timeout, rec);
2437 }
2438
2439
2440
2441 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2442                              TDB_DATA data, void *private_data)
2443 {
2444         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2445         struct ctdb_public_ip *ip;
2446
2447         if (rec->recmaster != rec->ctdb->pnn) {
2448                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2449                 return;
2450         }
2451
2452         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2453                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2454                 return;
2455         }
2456
2457         ip = (struct ctdb_public_ip *)data.dptr;
2458
2459         update_ip_assignment_tree(rec->ctdb, ip);
2460 }
2461
2462
2463 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2464 {
2465         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2466 }
2467
2468 static void reenable_takeover_runs(struct event_context *ev,
2469                                    struct timed_event *te,
2470                                    struct timeval yt, void *p)
2471 {
2472         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2473
2474         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2475         clear_takeover_runs_disable(rec);
2476 }
2477
2478 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2479                                           uint64_t srvid, TDB_DATA data,
2480                                           void *private_data)
2481 {
2482         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2483                                                     struct ctdb_recoverd);
2484         struct srvid_request *r;
2485         uint32_t timeout;
2486         TDB_DATA result;
2487         int32_t ret = 0;
2488
2489         /* Validate input data */
2490         if (data.dsize != sizeof(struct srvid_request)) {
2491                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2492                                  "expecting %lu\n", (long unsigned)data.dsize,
2493                                  (long unsigned)sizeof(struct srvid_request)));
2494                 ret = -EINVAL;
2495                 goto done;
2496         }
2497         if (data.dptr == NULL) {
2498                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2499                 ret = -EINVAL;
2500                 goto done;
2501         }
2502
2503         r = (struct srvid_request *)data.dptr;
2504         timeout = r->data;
2505
2506         if (timeout == 0) {
2507                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2508                 clear_takeover_runs_disable(rec);
2509                 ret = ctdb_get_pnn(ctdb);
2510                 goto done;
2511         }
2512
2513         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
2514                 DEBUG(DEBUG_ERR,
2515                       ("Refusing to disable takeover runs on inactive node\n"));
2516                 ret = -EHOSTDOWN;
2517                 goto done;
2518         }
2519
2520         if (rec->takeover_run_in_progress) {
2521                 DEBUG(DEBUG_ERR,
2522                       ("Unable to disable takeover runs - in progress\n"));
2523                 ret = -EAGAIN;
2524                 goto done;
2525         }
2526
2527         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2528
2529         /* Clear any old timers */
2530         clear_takeover_runs_disable(rec);
2531
2532         /* When this is non-NULL it indicates that takeover runs are
2533          * disabled.  This context also holds the timeout timer.
2534          */
2535         rec->takeover_runs_disable_ctx = talloc_new(rec);
2536         if (rec->takeover_runs_disable_ctx == NULL) {
2537                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2538                 ret = -ENOMEM;
2539                 goto done;
2540         }
2541
2542         /* Arrange for the timeout to occur */
2543         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2544                         timeval_current_ofs(timeout, 0),
2545                         reenable_takeover_runs,
2546                         rec);
2547
2548         /* Returning our PNN tells the caller that we succeeded */
2549         ret = ctdb_get_pnn(ctdb);
2550 done:
2551         result.dsize = sizeof(int32_t);
2552         result.dptr  = (uint8_t *)&ret;
2553         srvid_request_reply(ctdb, r, result);
2554 }
2555
2556 /* Backward compatibility for this SRVID - call
2557  * disable_takeover_runs_handler() instead
2558  */
2559 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2560                                      TDB_DATA data, void *private_data)
2561 {
2562         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2563                                                     struct ctdb_recoverd);
2564         TDB_DATA data2;
2565         struct srvid_request *req;
2566
2567         if (data.dsize != sizeof(uint32_t)) {
2568                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2569                                  "expecting %lu\n", (long unsigned)data.dsize,
2570                                  (long unsigned)sizeof(uint32_t)));
2571                 return;
2572         }
2573         if (data.dptr == NULL) {
2574                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2575                 return;
2576         }
2577
2578         req = talloc(ctdb, struct srvid_request);
2579         CTDB_NO_MEMORY_VOID(ctdb, req);
2580
2581         req->srvid = 0; /* No reply */
2582         req->pnn = -1;
2583         req->data = *((uint32_t *)data.dptr); /* Timeout */
2584
2585         data2.dsize = sizeof(*req);
2586         data2.dptr = (uint8_t *)req;
2587
2588         disable_takeover_runs_handler(rec->ctdb,
2589                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2590                                       data2, rec);
2591 }
2592
2593 /*
2594   handler for ip reallocate, just add it to the list of requests and 
2595   handle this later in the monitor_cluster loop so we do not recurse
2596   with other requests to takeover_run()
2597 */
2598 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2599                                   TDB_DATA data, void *private_data)
2600 {
2601         struct srvid_request *request;
2602         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2603                                                     struct ctdb_recoverd);
2604
2605         if (data.dsize != sizeof(struct srvid_request)) {
2606                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2607                 return;
2608         }
2609
2610         request = (struct srvid_request *)data.dptr;
2611
2612         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2613 }
2614
2615 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2616                                           struct ctdb_recoverd *rec)
2617 {
2618         TDB_DATA result;
2619         int32_t ret;
2620         uint32_t culprit;
2621
2622         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2623
2624         /* update the list of public ips that a node can handle for
2625            all connected nodes
2626         */
2627         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2628         if (ret != 0) {
2629                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2630                                  culprit));
2631                 rec->need_takeover_run = true;
2632         }
2633         if (ret == 0) {
2634                 if (do_takeover_run(rec, rec->nodemap, false)) {
2635                         ret = ctdb_get_pnn(ctdb);
2636                 } else {
2637                         ret = -1;
2638                 }
2639         }
2640
2641         result.dsize = sizeof(int32_t);
2642         result.dptr  = (uint8_t *)&ret;
2643
2644         srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
2645 }
2646
2647
2648 /*
2649   handler for recovery master elections
2650 */
2651 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2652                              TDB_DATA data, void *private_data)
2653 {
2654         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2655         int ret;
2656         struct election_message *em = (struct election_message *)data.dptr;
2657         TALLOC_CTX *mem_ctx;
2658
2659         /* we got an election packet - update the timeout for the election */
2660         talloc_free(rec->election_timeout);
2661         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2662                                                 fast_start ?
2663                                                 timeval_current_ofs(0, 500000) :
2664                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2665                                                 ctdb_election_timeout, rec);
2666
2667         mem_ctx = talloc_new(ctdb);
2668
2669         /* someone called an election. check their election data
2670            and if we disagree and we would rather be the elected node, 
2671            send a new election message to all other nodes
2672          */
2673         if (ctdb_election_win(rec, em)) {
2674                 if (!rec->send_election_te) {
2675                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2676                                                                 timeval_current_ofs(0, 500000),
2677                                                                 election_send_request, rec);
2678                 }
2679                 talloc_free(mem_ctx);
2680                 /*unban_all_nodes(ctdb);*/
2681                 return;
2682         }
2683         
2684         /* we didn't win */
2685         talloc_free(rec->send_election_te);
2686         rec->send_election_te = NULL;
2687
2688         if (ctdb->tunable.verify_recovery_lock != 0) {
2689                 /* release the recmaster lock */
2690                 if (em->pnn != ctdb->pnn &&
2691                     ctdb->recovery_lock_fd != -1) {
2692                         close(ctdb->recovery_lock_fd);
2693                         ctdb->recovery_lock_fd = -1;
2694                         unban_all_nodes(ctdb);
2695                 }
2696         }
2697
2698         /* ok, let that guy become recmaster then */
2699         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2700         if (ret != 0) {
2701                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2702                 talloc_free(mem_ctx);
2703                 return;
2704         }
2705
2706         talloc_free(mem_ctx);
2707         return;
2708 }
2709
2710
2711 /*
2712   force the start of the election process
2713  */
2714 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2715                            struct ctdb_node_map *nodemap)
2716 {
2717         int ret;
2718         struct ctdb_context *ctdb = rec->ctdb;
2719
2720         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2721
2722         /* set all nodes to recovery mode to stop all internode traffic */
2723         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2724         if (ret != 0) {
2725                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2726                 return;
2727         }
2728
2729         talloc_free(rec->election_timeout);
2730         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2731                                                 fast_start ?
2732                                                 timeval_current_ofs(0, 500000) :
2733                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2734                                                 ctdb_election_timeout, rec);
2735
2736         ret = send_election_request(rec, pnn, true);
2737         if (ret!=0) {
2738                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2739                 return;
2740         }
2741
2742         /* wait for a few seconds to collect all responses */
2743         ctdb_wait_election(rec);
2744 }
2745
2746
2747
2748 /*
2749   handler for when a node changes its flags
2750 */
2751 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2752                             TDB_DATA data, void *private_data)
2753 {
2754         int ret;
2755         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2756         struct ctdb_node_map *nodemap=NULL;
2757         TALLOC_CTX *tmp_ctx;
2758         int i;
2759         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2760         int disabled_flag_changed;
2761
2762         if (data.dsize != sizeof(*c)) {
2763                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2764                 return;
2765         }
2766
2767         tmp_ctx = talloc_new(ctdb);
2768         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2769
2770         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2771         if (ret != 0) {
2772                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2773                 talloc_free(tmp_ctx);
2774                 return;         
2775         }
2776
2777
2778         for (i=0;i<nodemap->num;i++) {
2779                 if (nodemap->nodes[i].pnn == c->pnn) break;
2780         }
2781
2782         if (i == nodemap->num) {
2783                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2784                 talloc_free(tmp_ctx);
2785                 return;
2786         }
2787
2788         if (c->old_flags != c->new_flags) {
2789                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2790         }
2791
2792         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2793
2794         nodemap->nodes[i].flags = c->new_flags;
2795
2796         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2797                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2798
2799         if (ret == 0) {
2800                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2801                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2802         }
2803         
2804         if (ret == 0 &&
2805             ctdb->recovery_master == ctdb->pnn &&
2806             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2807                 /* Only do the takeover run if the perm disabled or unhealthy
2808                    flags changed since these will cause an ip failover but not
2809                    a recovery.
2810                    If the node became disconnected or banned this will also
2811                    lead to an ip address failover but that is handled 
2812                    during recovery
2813                 */
2814                 if (disabled_flag_changed) {
2815                         rec->need_takeover_run = true;
2816                 }
2817         }
2818
2819         talloc_free(tmp_ctx);
2820 }
2821
2822 /*
2823   handler for when we need to push out flag changes ot all other nodes
2824 */
2825 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2826                             TDB_DATA data, void *private_data)
2827 {
2828         int ret;
2829         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2830         struct ctdb_node_map *nodemap=NULL;
2831         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2832         uint32_t recmaster;
2833         uint32_t *nodes;
2834
2835         /* find the recovery master */
2836         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2837         if (ret != 0) {
2838                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2839                 talloc_free(tmp_ctx);
2840                 return;
2841         }
2842
2843         /* read the node flags from the recmaster */
2844         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2845         if (ret != 0) {
2846                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2847                 talloc_free(tmp_ctx);
2848                 return;
2849         }
2850         if (c->pnn >= nodemap->num) {
2851                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2852                 talloc_free(tmp_ctx);
2853                 return;
2854         }
2855
2856         /* send the flags update to all connected nodes */
2857         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2858
2859         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2860                                       nodes, 0, CONTROL_TIMEOUT(),
2861                                       false, data,
2862                                       NULL, NULL,
2863                                       NULL) != 0) {
2864                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2865
2866                 talloc_free(tmp_ctx);
2867                 return;
2868         }
2869
2870         talloc_free(tmp_ctx);
2871 }
2872
2873
2874 struct verify_recmode_normal_data {
2875         uint32_t count;
2876         enum monitor_result status;
2877 };
2878
2879 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2880 {
2881         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2882
2883
2884         /* one more node has responded with recmode data*/
2885         rmdata->count--;
2886
2887         /* if we failed to get the recmode, then return an error and let
2888            the main loop try again.
2889         */
2890         if (state->state != CTDB_CONTROL_DONE) {
2891                 if (rmdata->status == MONITOR_OK) {
2892                         rmdata->status = MONITOR_FAILED;
2893                 }
2894                 return;
2895         }
2896
2897         /* if we got a response, then the recmode will be stored in the
2898            status field
2899         */
2900         if (state->status != CTDB_RECOVERY_NORMAL) {
2901                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2902                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2903         }
2904
2905         return;
2906 }
2907
2908
2909 /* verify that all nodes are in normal recovery mode */
2910 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2911 {
2912         struct verify_recmode_normal_data *rmdata;
2913         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2914         struct ctdb_client_control_state *state;
2915         enum monitor_result status;
2916         int j;
2917         
2918         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2919         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2920         rmdata->count  = 0;
2921         rmdata->status = MONITOR_OK;
2922
2923         /* loop over all active nodes and send an async getrecmode call to 
2924            them*/
2925         for (j=0; j<nodemap->num; j++) {
2926                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2927                         continue;
2928                 }
2929                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2930                                         CONTROL_TIMEOUT(), 
2931                                         nodemap->nodes[j].pnn);
2932                 if (state == NULL) {
2933                         /* we failed to send the control, treat this as 
2934                            an error and try again next iteration
2935                         */                      
2936                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2937                         talloc_free(mem_ctx);
2938                         return MONITOR_FAILED;
2939                 }
2940
2941                 /* set up the callback functions */
2942                 state->async.fn = verify_recmode_normal_callback;
2943                 state->async.private_data = rmdata;
2944
2945                 /* one more control to wait for to complete */
2946                 rmdata->count++;
2947         }
2948
2949
2950         /* now wait for up to the maximum number of seconds allowed
2951            or until all nodes we expect a response from has replied
2952         */
2953         while (rmdata->count > 0) {
2954                 event_loop_once(ctdb->ev);
2955         }
2956
2957         status = rmdata->status;
2958         talloc_free(mem_ctx);
2959         return status;
2960 }
2961
2962
2963 struct verify_recmaster_data {
2964         struct ctdb_recoverd *rec;
2965         uint32_t count;
2966         uint32_t pnn;
2967         enum monitor_result status;
2968 };
2969
2970 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2971 {
2972         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2973
2974
2975         /* one more node has responded with recmaster data*/
2976         rmdata->count--;
2977
2978         /* if we failed to get the recmaster, then return an error and let
2979            the main loop try again.
2980         */
2981         if (state->state != CTDB_CONTROL_DONE) {
2982                 if (rmdata->status == MONITOR_OK) {
2983                         rmdata->status = MONITOR_FAILED;
2984                 }
2985                 return;
2986         }
2987
2988         /* if we got a response, then the recmaster will be stored in the
2989            status field
2990         */
2991         if (state->status != rmdata->pnn) {
2992                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2993                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2994                 rmdata->status = MONITOR_ELECTION_NEEDED;
2995         }
2996
2997         return;
2998 }
2999
3000
3001 /* verify that all nodes agree that we are the recmaster */
3002 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3003 {
3004         struct ctdb_context *ctdb = rec->ctdb;
3005         struct verify_recmaster_data *rmdata;
3006         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3007         struct ctdb_client_control_state *state;
3008         enum monitor_result status;
3009         int j;
3010         
3011         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3012         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3013         rmdata->rec    = rec;
3014         rmdata->count  = 0;
3015         rmdata->pnn    = pnn;
3016         rmdata->status = MONITOR_OK;
3017
3018         /* loop over all active nodes and send an async getrecmaster call to 
3019            them*/
3020         for (j=0; j<nodemap->num; j++) {
3021                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3022                         continue;
3023                 }
3024                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3025                                         CONTROL_TIMEOUT(),
3026                                         nodemap->nodes[j].pnn);
3027                 if (state == NULL) {
3028                         /* we failed to send the control, treat this as 
3029                            an error and try again next iteration
3030                         */                      
3031                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3032                         talloc_free(mem_ctx);
3033                         return MONITOR_FAILED;
3034                 }
3035
3036                 /* set up the callback functions */
3037                 state->async.fn = verify_recmaster_callback;
3038                 state->async.private_data = rmdata;
3039
3040                 /* one more control to wait for to complete */
3041                 rmdata->count++;
3042         }
3043
3044
3045         /* now wait for up to the maximum number of seconds allowed
3046            or until all nodes we expect a response from has replied
3047         */
3048         while (rmdata->count > 0) {
3049                 event_loop_once(ctdb->ev);
3050         }
3051
3052         status = rmdata->status;
3053         talloc_free(mem_ctx);
3054         return status;
3055 }
3056
3057 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3058                                     struct ctdb_recoverd *rec)
3059 {
3060         struct ctdb_control_get_ifaces *ifaces = NULL;
3061         TALLOC_CTX *mem_ctx;
3062         bool ret = false;
3063
3064         mem_ctx = talloc_new(NULL);
3065
3066         /* Read the interfaces from the local node */
3067         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3068                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3069                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3070                 /* We could return an error.  However, this will be
3071                  * rare so we'll decide that the interfaces have
3072                  * actually changed, just in case.
3073                  */
3074                 talloc_free(mem_ctx);
3075                 return true;
3076         }
3077
3078         if (!rec->ifaces) {
3079                 /* We haven't been here before so things have changed */
3080                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3081                 ret = true;
3082         } else if (rec->ifaces->num != ifaces->num) {
3083                 /* Number of interfaces has changed */
3084                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3085                                      rec->ifaces->num, ifaces->num));
3086                 ret = true;
3087         } else {
3088                 /* See if interface names or link states have changed */
3089                 int i;
3090                 for (i = 0; i < rec->ifaces->num; i++) {
3091                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3092                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3093                                 DEBUG(DEBUG_NOTICE,
3094                                       ("Interface in slot %d changed: %s => %s\n",
3095                                        i, iface->name, ifaces->ifaces[i].name));
3096                                 ret = true;
3097                                 break;
3098                         }
3099                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3100                                 DEBUG(DEBUG_NOTICE,
3101                                       ("Interface %s changed state: %d => %d\n",
3102                                        iface->name, iface->link_state,
3103                                        ifaces->ifaces[i].link_state));
3104                                 ret = true;
3105                                 break;
3106                         }
3107                 }
3108         }
3109
3110         talloc_free(rec->ifaces);
3111         rec->ifaces = talloc_steal(rec, ifaces);
3112
3113         talloc_free(mem_ctx);
3114         return ret;
3115 }
3116
3117 /* called to check that the local allocation of public ip addresses is ok.
3118 */
3119 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3120 {
3121         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3122         struct ctdb_uptime *uptime1 = NULL;
3123         struct ctdb_uptime *uptime2 = NULL;
3124         int ret, j;
3125         bool need_takeover_run = false;
3126
3127         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3128                                 CTDB_CURRENT_NODE, &uptime1);
3129         if (ret != 0) {
3130                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3131                 talloc_free(mem_ctx);
3132                 return -1;
3133         }
3134
3135         if (interfaces_have_changed(ctdb, rec)) {
3136                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3137                                      "local node %u - force takeover run\n",
3138                                      pnn));
3139                 need_takeover_run = true;
3140         }
3141
3142         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3143                                 CTDB_CURRENT_NODE, &uptime2);
3144         if (ret != 0) {
3145                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3146                 talloc_free(mem_ctx);
3147                 return -1;
3148         }
3149
3150         /* skip the check if the startrecovery time has changed */
3151         if (timeval_compare(&uptime1->last_recovery_started,
3152                             &uptime2->last_recovery_started) != 0) {
3153                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3154                 talloc_free(mem_ctx);
3155                 return 0;
3156         }
3157
3158         /* skip the check if the endrecovery time has changed */
3159         if (timeval_compare(&uptime1->last_recovery_finished,
3160                             &uptime2->last_recovery_finished) != 0) {
3161                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3162                 talloc_free(mem_ctx);
3163                 return 0;
3164         }
3165
3166         /* skip the check if we have started but not finished recovery */
3167         if (timeval_compare(&uptime1->last_recovery_finished,
3168                             &uptime1->last_recovery_started) != 1) {
3169                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3170                 talloc_free(mem_ctx);
3171
3172                 return 0;
3173         }
3174
3175         /* verify that we have the ip addresses we should have
3176            and we dont have ones we shouldnt have.
3177            if we find an inconsistency we set recmode to
3178            active on the local node and wait for the recmaster
3179            to do a full blown recovery.
3180            also if the pnn is -1 and we are healthy and can host the ip
3181            we also request a ip reallocation.
3182         */
3183         if (ctdb->tunable.disable_ip_failover == 0) {
3184                 struct ctdb_all_public_ips *ips = NULL;
3185
3186                 /* read the *available* IPs from the local node */
3187                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3188                 if (ret != 0) {
3189                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3190                         talloc_free(mem_ctx);
3191                         return -1;
3192                 }
3193
3194                 for (j=0; j<ips->num; j++) {
3195                         if (ips->ips[j].pnn == -1 &&
3196                             nodemap->nodes[pnn].flags == 0) {
3197                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3198                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3199                                 need_takeover_run = true;
3200                         }
3201                 }
3202
3203                 talloc_free(ips);
3204
3205                 /* read the *known* IPs from the local node */
3206                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3207                 if (ret != 0) {
3208                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3209                         talloc_free(mem_ctx);
3210                         return -1;
3211                 }
3212
3213                 for (j=0; j<ips->num; j++) {
3214                         if (ips->ips[j].pnn == pnn) {
3215                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3216                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3217                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3218                                         need_takeover_run = true;
3219                                 }
3220                         } else {
3221                                 if (ctdb->do_checkpublicip &&
3222                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3223
3224                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3225                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3226
3227                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3228                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3229                                         }
3230                                 }
3231                         }
3232                 }
3233         }
3234
3235         if (need_takeover_run) {
3236                 struct srvid_request rd;
3237                 TDB_DATA data;
3238
3239                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3240
3241                 rd.pnn = ctdb->pnn;
3242                 rd.srvid = 0;
3243                 data.dptr = (uint8_t *)&rd;
3244                 data.dsize = sizeof(rd);
3245
3246                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3247                 if (ret != 0) {
3248                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3249                 }
3250         }
3251         talloc_free(mem_ctx);
3252         return 0;
3253 }
3254
3255
3256 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3257 {
3258         struct ctdb_node_map **remote_nodemaps = callback_data;
3259
3260         if (node_pnn >= ctdb->num_nodes) {
3261                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3262                 return;
3263         }
3264
3265         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3266
3267 }
3268
3269 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3270         struct ctdb_node_map *nodemap,
3271         struct ctdb_node_map **remote_nodemaps)
3272 {
3273         uint32_t *nodes;
3274
3275         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3276         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3277                                         nodes, 0,
3278                                         CONTROL_TIMEOUT(), false, tdb_null,
3279                                         async_getnodemap_callback,
3280                                         NULL,
3281                                         remote_nodemaps) != 0) {
3282                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3283
3284                 return -1;
3285         }
3286
3287         return 0;
3288 }
3289
3290 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3291 struct ctdb_check_reclock_state {
3292         struct ctdb_context *ctdb;
3293         struct timeval start_time;
3294         int fd[2];
3295         pid_t child;
3296         struct timed_event *te;
3297         struct fd_event *fde;
3298         enum reclock_child_status status;
3299 };
3300
3301 /* when we free the reclock state we must kill any child process.
3302 */
3303 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3304 {
3305         struct ctdb_context *ctdb = state->ctdb;
3306
3307         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3308
3309         if (state->fd[0] != -1) {
3310                 close(state->fd[0]);
3311                 state->fd[0] = -1;
3312         }
3313         if (state->fd[1] != -1) {
3314                 close(state->fd[1]);
3315                 state->fd[1] = -1;
3316         }
3317         ctdb_kill(ctdb, state->child, SIGKILL);
3318         return 0;
3319 }
3320
3321 /*
3322   called if our check_reclock child times out. this would happen if
3323   i/o to the reclock file blocks.
3324  */
3325 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3326                                          struct timeval t, void *private_data)
3327 {
3328         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3329                                            struct ctdb_check_reclock_state);
3330
3331         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3332         state->status = RECLOCK_TIMEOUT;
3333 }
3334
3335 /* this is called when the child process has completed checking the reclock
3336    file and has written data back to us through the pipe.
3337 */
3338 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3339                              uint16_t flags, void *private_data)
3340 {
3341         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3342                                              struct ctdb_check_reclock_state);
3343         char c = 0;
3344         int ret;
3345
3346         /* we got a response from our child process so we can abort the
3347            timeout.
3348         */
3349         talloc_free(state->te);
3350         state->te = NULL;
3351
3352         ret = read(state->fd[0], &c, 1);
3353         if (ret != 1 || c != RECLOCK_OK) {
3354                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3355                 state->status = RECLOCK_FAILED;
3356
3357                 return;
3358         }
3359
3360         state->status = RECLOCK_OK;
3361         return;
3362 }
3363
3364 static int check_recovery_lock(struct ctdb_context *ctdb)
3365 {
3366         int ret;
3367         struct ctdb_check_reclock_state *state;
3368         pid_t parent = getpid();
3369
3370         if (ctdb->recovery_lock_fd == -1) {
3371                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3372                 return -1;
3373         }
3374
3375         state = talloc(ctdb, struct ctdb_check_reclock_state);
3376         CTDB_NO_MEMORY(ctdb, state);
3377
3378         state->ctdb = ctdb;
3379         state->start_time = timeval_current();
3380         state->status = RECLOCK_CHECKING;
3381         state->fd[0] = -1;
3382         state->fd[1] = -1;
3383
3384         ret = pipe(state->fd);
3385         if (ret != 0) {
3386                 talloc_free(state);
3387                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3388                 return -1;
3389         }
3390
3391         state->child = ctdb_fork(ctdb);
3392         if (state->child == (pid_t)-1) {
3393                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3394                 close(state->fd[0]);
3395                 state->fd[0] = -1;
3396                 close(state->fd[1]);
3397                 state->fd[1] = -1;
3398                 talloc_free(state);
3399                 return -1;
3400         }
3401
3402         if (state->child == 0) {
3403                 char cc = RECLOCK_OK;
3404                 close(state->fd[0]);
3405                 state->fd[0] = -1;
3406
3407                 ctdb_set_process_name("ctdb_rec_reclock");
3408                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3409                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3410                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3411                         cc = RECLOCK_FAILED;
3412                 }
3413
3414                 write(state->fd[1], &cc, 1);
3415                 /* make sure we die when our parent dies */
3416                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3417                         sleep(5);
3418                 }
3419                 _exit(0);
3420         }
3421         close(state->fd[1]);
3422         state->fd[1] = -1;
3423         set_close_on_exec(state->fd[0]);
3424
3425         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3426
3427         talloc_set_destructor(state, check_reclock_destructor);
3428
3429         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3430                                     ctdb_check_reclock_timeout, state);
3431         if (state->te == NULL) {
3432                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3433                 talloc_free(state);
3434                 return -1;
3435         }
3436
3437         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3438                                 EVENT_FD_READ,
3439                                 reclock_child_handler,
3440                                 (void *)state);
3441
3442         if (state->fde == NULL) {
3443                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3444                 talloc_free(state);
3445                 return -1;
3446         }
3447         tevent_fd_set_auto_close(state->fde);
3448
3449         while (state->status == RECLOCK_CHECKING) {
3450                 event_loop_once(ctdb->ev);
3451         }
3452
3453         if (state->status == RECLOCK_FAILED) {
3454                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3455                 close(ctdb->recovery_lock_fd);
3456                 ctdb->recovery_lock_fd = -1;
3457                 talloc_free(state);
3458                 return -1;
3459         }
3460
3461         talloc_free(state);
3462         return 0;
3463 }
3464
3465 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3466 {
3467         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3468         const char *reclockfile;
3469
3470         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3471                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3472                 talloc_free(tmp_ctx);
3473                 return -1;      
3474         }
3475
3476         if (reclockfile == NULL) {
3477                 if (ctdb->recovery_lock_file != NULL) {
3478                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3479                         talloc_free(ctdb->recovery_lock_file);
3480                         ctdb->recovery_lock_file = NULL;
3481                         if (ctdb->recovery_lock_fd != -1) {
3482                                 close(ctdb->recovery_lock_fd);
3483                                 ctdb->recovery_lock_fd = -1;
3484                         }
3485                 }
3486                 ctdb->tunable.verify_recovery_lock = 0;
3487                 talloc_free(tmp_ctx);
3488                 return 0;
3489         }
3490
3491         if (ctdb->recovery_lock_file == NULL) {
3492                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3493                 if (ctdb->recovery_lock_fd != -1) {
3494                         close(ctdb->recovery_lock_fd);
3495                         ctdb->recovery_lock_fd = -1;
3496                 }
3497                 talloc_free(tmp_ctx);
3498                 return 0;
3499         }
3500
3501
3502         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3503                 talloc_free(tmp_ctx);
3504                 return 0;
3505         }
3506
3507         talloc_free(ctdb->recovery_lock_file);
3508         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3509         ctdb->tunable.verify_recovery_lock = 0;
3510         if (ctdb->recovery_lock_fd != -1) {
3511                 close(ctdb->recovery_lock_fd);
3512                 ctdb->recovery_lock_fd = -1;
3513         }
3514
3515         talloc_free(tmp_ctx);
3516         return 0;
3517 }
3518
3519 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3520                       TALLOC_CTX *mem_ctx)
3521 {
3522         uint32_t pnn;
3523         struct ctdb_node_map *nodemap=NULL;
3524         struct ctdb_node_map *recmaster_nodemap=NULL;
3525         struct ctdb_node_map **remote_nodemaps=NULL;
3526         struct ctdb_vnn_map *vnnmap=NULL;
3527         struct ctdb_vnn_map *remote_vnnmap=NULL;
3528         int32_t debug_level;
3529         int i, j, ret;
3530         bool self_ban;
3531
3532
3533         /* verify that the main daemon is still running */
3534         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3535                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3536                 exit(-1);
3537         }
3538
3539         /* ping the local daemon to tell it we are alive */
3540         ctdb_ctrl_recd_ping(ctdb);
3541
3542         if (rec->election_timeout) {
3543                 /* an election is in progress */
3544                 return;
3545         }
3546
3547         /* read the debug level from the parent and update locally */
3548         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3549         if (ret !=0) {
3550                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3551                 return;
3552         }
3553         LogLevel = debug_level;
3554
3555         /* get relevant tunables */
3556         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3557         if (ret != 0) {
3558                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3559                 return;
3560         }
3561
3562         /* get the current recovery lock file from the server */
3563         if (update_recovery_lock_file(ctdb) != 0) {
3564                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3565                 return;
3566         }
3567
3568         /* Make sure that if recovery lock verification becomes disabled when
3569            we close the file
3570         */
3571         if (ctdb->tunable.verify_recovery_lock == 0) {
3572                 if (ctdb->recovery_lock_fd != -1) {
3573                         close(ctdb->recovery_lock_fd);
3574                         ctdb->recovery_lock_fd = -1;
3575                 }
3576         }
3577
3578         pnn = ctdb_get_pnn(ctdb);
3579
3580         /* get the vnnmap */
3581         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3582         if (ret != 0) {
3583                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3584                 return;
3585         }
3586
3587
3588         /* get number of nodes */
3589         if (rec->nodemap) {
3590                 talloc_free(rec->nodemap);
3591                 rec->nodemap = NULL;
3592                 nodemap=NULL;
3593         }
3594         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3595         if (ret != 0) {
3596                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3597                 return;
3598         }
3599         nodemap = rec->nodemap;
3600
3601         /* remember our own node flags */
3602         rec->node_flags = nodemap->nodes[pnn].flags;
3603
3604         ban_misbehaving_nodes(rec, &self_ban);
3605         if (self_ban) {
3606                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3607                 return;
3608         }
3609
3610         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3611            also frozen and that the recmode is set to active.
3612         */
3613         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3614                 /* If this node has become inactive then we want to
3615                  * reduce the chances of it taking over the recovery
3616                  * master role when it becomes active again.  This
3617                  * helps to stabilise the recovery master role so that
3618                  * it stays on the most stable node.
3619                  */
3620                 rec->priority_time = timeval_current();
3621
3622                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3623                 if (ret != 0) {
3624                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3625                 }
3626                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3627                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3628
3629                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3630                         if (ret != 0) {
3631                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3632                                 return;
3633                         }
3634                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3635                         if (ret != 0) {
3636                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3637
3638                                 return;
3639                         }
3640                 }
3641
3642                 /* If this node is stopped or banned then it is not the recovery
3643                  * master, so don't do anything. This prevents stopped or banned
3644                  * node from starting election and sending unnecessary controls.
3645                  */
3646                 return;
3647         }
3648
3649         /* check which node is the recovery master */
3650         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3651         if (ret != 0) {
3652                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3653                 return;
3654         }
3655
3656         /* If we are not the recmaster then do some housekeeping */
3657         if (rec->recmaster != pnn) {
3658                 /* Ignore any IP reallocate requests - only recmaster
3659                  * processes them
3660                  */
3661                 TALLOC_FREE(rec->reallocate_requests);
3662                 /* Clear any nodes that should be force rebalanced in
3663                  * the next takeover run.  If the recovery master role
3664                  * has moved then we don't want to process these some
3665                  * time in the future.
3666                  */
3667                 TALLOC_FREE(rec->force_rebalance_nodes);
3668         }
3669
3670         /* This is a special case.  When recovery daemon is started, recmaster
3671          * is set to -1.  If a node is not started in stopped state, then
3672          * start election to decide recovery master
3673          */
3674         if (rec->recmaster == (uint32_t)-1) {
3675                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3676                 force_election(rec, pnn, nodemap);
3677                 return;
3678         }
3679
3680         /* update the capabilities for all nodes */
3681         ret = update_capabilities(ctdb, nodemap);
3682         if (ret != 0) {
3683                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3684                 return;
3685         }
3686
3687         /*
3688          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3689          * but we have, then force an election and try to become the new
3690          * recmaster.
3691          */
3692         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3693             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3694              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3695                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3696                                   " but we (node %u) have - force an election\n",
3697                                   rec->recmaster, pnn));
3698                 force_election(rec, pnn, nodemap);
3699                 return;
3700         }
3701
3702         /* count how many active nodes there are */
3703         rec->num_active    = 0;
3704         rec->num_connected = 0;
3705         for (i=0; i<nodemap->num; i++) {
3706                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3707                         rec->num_active++;
3708                 }
3709                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3710                         rec->num_connected++;
3711                 }
3712         }
3713
3714
3715         /* verify that the recmaster node is still active */
3716         for (j=0; j<nodemap->num; j++) {
3717                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3718                         break;
3719                 }
3720         }
3721
3722         if (j == nodemap->num) {
3723                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3724                 force_election(rec, pnn, nodemap);
3725                 return;
3726         }
3727
3728         /* if recovery master is disconnected we must elect a new recmaster */
3729         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3730                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3731                 force_election(rec, pnn, nodemap);
3732                 return;
3733         }
3734
3735         /* get nodemap from the recovery master to check if it is inactive */
3736         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3737                                    mem_ctx, &recmaster_nodemap);
3738         if (ret != 0) {
3739                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3740                           nodemap->nodes[j].pnn));
3741                 return;
3742         }
3743
3744
3745         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3746             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3747                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3748                 /*
3749                  * update our nodemap to carry the recmaster's notion of
3750                  * its own flags, so that we don't keep freezing the
3751                  * inactive recmaster node...
3752                  */
3753                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3754                 force_election(rec, pnn, nodemap);
3755                 return;
3756         }
3757
3758         /* verify that we have all ip addresses we should have and we dont
3759          * have addresses we shouldnt have.
3760          */ 
3761         if (ctdb->tunable.disable_ip_failover == 0 &&
3762             rec->takeover_runs_disable_ctx == NULL) {
3763                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3764                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3765                 }
3766         }
3767
3768
3769         /* if we are not the recmaster then we do not need to check
3770            if recovery is needed
3771          */
3772         if (pnn != rec->recmaster) {
3773                 return;
3774         }
3775
3776
3777         /* ensure our local copies of flags are right */
3778         ret = update_local_flags(rec, nodemap);
3779         if (ret == MONITOR_ELECTION_NEEDED) {
3780                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3781                 force_election(rec, pnn, nodemap);
3782                 return;
3783         }
3784         if (ret != MONITOR_OK) {
3785                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3786                 return;
3787         }
3788
3789         if (ctdb->num_nodes != nodemap->num) {
3790                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3791                 reload_nodes_file(ctdb);
3792                 return;
3793         }
3794
3795         /* verify that all active nodes agree that we are the recmaster */
3796         switch (verify_recmaster(rec, nodemap, pnn)) {
3797         case MONITOR_RECOVERY_NEEDED:
3798                 /* can not happen */
3799                 return;
3800         case MONITOR_ELECTION_NEEDED:
3801                 force_election(rec, pnn, nodemap);
3802                 return;
3803         case MONITOR_OK:
3804                 break;
3805         case MONITOR_FAILED:
3806                 return;
3807         }
3808
3809
3810         if (rec->need_recovery) {
3811                 /* a previous recovery didn't finish */
3812                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3813                 return;
3814         }
3815
3816         /* verify that all active nodes are in normal mode 
3817            and not in recovery mode 
3818         */
3819         switch (verify_recmode(ctdb, nodemap)) {
3820         case MONITOR_RECOVERY_NEEDED:
3821                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3822                 return;
3823         case MONITOR_FAILED:
3824                 return;
3825         case MONITOR_ELECTION_NEEDED:
3826                 /* can not happen */
3827         case MONITOR_OK:
3828                 break;
3829         }
3830
3831
3832         if (ctdb->tunable.verify_recovery_lock != 0) {
3833                 /* we should have the reclock - check its not stale */
3834                 ret = check_recovery_lock(ctdb);
3835                 if (ret != 0) {
3836                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3837                         ctdb_set_culprit(rec, ctdb->pnn);
3838                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3839                         return;
3840                 }
3841         }
3842
3843
3844         /* if there are takeovers requested, perform it and notify the waiters */
3845         if (rec->takeover_runs_disable_ctx == NULL &&
3846             rec->reallocate_requests) {
3847                 process_ipreallocate_requests(ctdb, rec);
3848         }
3849
3850         /* get the nodemap for all active remote nodes
3851          */
3852         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3853         if (remote_nodemaps == NULL) {
3854                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3855                 return;
3856         }
3857         for(i=0; i<nodemap->num; i++) {
3858                 remote_nodemaps[i] = NULL;
3859         }
3860         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3861                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3862                 return;
3863         } 
3864
3865         /* verify that all other nodes have the same nodemap as we have
3866         */
3867         for (j=0; j<nodemap->num; j++) {
3868                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3869                         continue;
3870                 }
3871
3872                 if (remote_nodemaps[j] == NULL) {
3873                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3874                         ctdb_set_culprit(rec, j);
3875
3876                         return;
3877                 }
3878
3879                 /* if the nodes disagree on how many nodes there are
3880                    then this is a good reason to try recovery
3881                  */
3882                 if (remote_nodemaps[j]->num != nodemap->num) {
3883                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3884                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3885                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3886                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3887                         return;
3888                 }
3889
3890                 /* if the nodes disagree on which nodes exist and are
3891                    active, then that is also a good reason to do recovery
3892                  */
3893                 for (i=0;i<nodemap->num;i++) {
3894                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3895                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3896                                           nodemap->nodes[j].pnn, i, 
3897                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3898                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3899                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3900                                             vnnmap);
3901                                 return;
3902                         }
3903                 }
3904         }
3905
3906         /*
3907          * Update node flags obtained from each active node. This ensure we have
3908          * up-to-date information for all the nodes.
3909          */
3910         for (j=0; j<nodemap->num; j++) {
3911                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3912                         continue;
3913                 }
3914                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3915         }
3916
3917         for (j=0; j<nodemap->num; j++) {
3918                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3919                         continue;
3920                 }
3921
3922                 /* verify the flags are consistent
3923                 */
3924                 for (i=0; i<nodemap->num; i++) {
3925                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3926                                 continue;
3927                         }
3928                         
3929                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3930                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3931                                   nodemap->nodes[j].pnn, 
3932                                   nodemap->nodes[i].pnn, 
3933                                   remote_nodemaps[j]->nodes[i].flags,
3934                                   nodemap->nodes[i].flags));
3935                                 if (i == j) {
3936                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3937                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3938                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3939                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3940                                                     vnnmap);
3941                                         return;
3942                                 } else {
3943                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3944                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3945                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3946                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3947                                                     vnnmap);
3948                                         return;
3949                                 }
3950                         }
3951                 }
3952         }
3953
3954
3955         /* there better be the same number of lmasters in the vnn map
3956            as there are active nodes or we will have to do a recovery
3957          */
3958         if (vnnmap->size != rec->num_active) {
3959                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3960                           vnnmap->size, rec->num_active));
3961                 ctdb_set_culprit(rec, ctdb->pnn);
3962                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3963                 return;
3964         }
3965
3966         /* verify that all active nodes in the nodemap also exist in 
3967            the vnnmap.
3968          */
3969         for (j=0; j<nodemap->num; j++) {
3970                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3971                         continue;
3972                 }
3973                 if (nodemap->nodes[j].pnn == pnn) {
3974                         continue;
3975                 }
3976
3977                 for (i=0; i<vnnmap->size; i++) {
3978                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3979                                 break;
3980                         }
3981                 }
3982                 if (i == vnnmap->size) {
3983                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3984                                   nodemap->nodes[j].pnn));
3985                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3986                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3987                         return;
3988                 }
3989         }
3990
3991         
3992         /* verify that all other nodes have the same vnnmap
3993            and are from the same generation
3994          */
3995         for (j=0; j<nodemap->num; j++) {
3996                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3997                         continue;
3998                 }
3999                 if (nodemap->nodes[j].pnn == pnn) {
4000                         continue;
4001                 }
4002
4003                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
4004                                           mem_ctx, &remote_vnnmap);
4005                 if (ret != 0) {
4006                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4007                                   nodemap->nodes[j].pnn));
4008                         return;
4009                 }
4010
4011                 /* verify the vnnmap generation is the same */
4012                 if (vnnmap->generation != remote_vnnmap->generation) {
4013                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4014                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4015                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4016                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4017                         return;
4018                 }
4019
4020                 /* verify the vnnmap size is the same */
4021                 if (vnnmap->size != remote_vnnmap->size) {
4022                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4023                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4024                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4025                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4026                         return;
4027                 }
4028
4029                 /* verify the vnnmap is the same */
4030                 for (i=0;i<vnnmap->size;i++) {
4031                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4032                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4033                                           nodemap->nodes[j].pnn));
4034                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4035                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4036                                             vnnmap);
4037                                 return;
4038                         }
4039                 }
4040         }
4041
4042         /* we might need to change who has what IP assigned */
4043         if (rec->need_takeover_run) {
4044                 uint32_t culprit = (uint32_t)-1;
4045
4046                 rec->need_takeover_run = false;
4047
4048                 /* update the list of public ips that a node can handle for
4049                    all connected nodes
4050                 */
4051                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4052                 if (ret != 0) {
4053                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4054                                          culprit));
4055                         rec->need_takeover_run = true;
4056                         return;
4057                 }
4058
4059                 /* execute the "startrecovery" event script on all nodes */
4060                 ret = run_startrecovery_eventscript(rec, nodemap);
4061                 if (ret!=0) {
4062                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4063                         ctdb_set_culprit(rec, ctdb->pnn);
4064                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4065                         return;
4066                 }
4067
4068                 /* If takeover run fails, then the offending nodes are
4069                  * assigned ban culprit counts. And we re-try takeover.
4070                  * If takeover run fails repeatedly, the node would get
4071                  * banned.
4072                  *
4073                  * If rec->need_takeover_run is not set to true at this
4074                  * failure, monitoring is disabled cluster-wide (via
4075                  * startrecovery eventscript) and will not get enabled.
4076                  */
4077                 if (!do_takeover_run(rec, nodemap, true)) {
4078                         return;
4079                 }
4080
4081                 /* execute the "recovered" event script on all nodes */
4082                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4083 #if 0
4084 // we cant check whether the event completed successfully
4085 // since this script WILL fail if the node is in recovery mode
4086 // and if that race happens, the code here would just cause a second
4087 // cascading recovery.
4088                 if (ret!=0) {
4089                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4090                         ctdb_set_culprit(rec, ctdb->pnn);
4091                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4092                 }
4093 #endif
4094         }
4095 }
4096
4097 /*
4098   the main monitoring loop
4099  */
4100 static void monitor_cluster(struct ctdb_context *ctdb)
4101 {
4102         struct ctdb_recoverd *rec;
4103
4104         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4105
4106         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4107         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4108
4109         rec->ctdb = ctdb;
4110
4111         rec->takeover_run_in_progress = false;
4112
4113         rec->priority_time = timeval_current();
4114
4115         /* register a message port for sending memory dumps */
4116         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4117
4118         /* register a message port for requesting logs */
4119         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4120
4121         /* register a message port for clearing logs */
4122         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4123
4124         /* register a message port for recovery elections */
4125         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4126
4127         /* when nodes are disabled/enabled */
4128         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4129
4130         /* when we are asked to puch out a flag change */
4131         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4132
4133         /* register a message port for vacuum fetch */
4134         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4135
4136         /* register a message port for reloadnodes  */
4137         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4138
4139         /* register a message port for performing a takeover run */
4140         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4141
4142         /* register a message port for disabling the ip check for a short while */
4143         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4144
4145         /* register a message port for updating the recovery daemons node assignment for an ip */
4146         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4147
4148         /* register a message port for forcing a rebalance of a node next
4149            reallocation */
4150         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4151
4152         /* Register a message port for disabling takeover runs */
4153         ctdb_client_set_message_handler(ctdb,
4154                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4155                                         disable_takeover_runs_handler, rec);
4156
4157         for (;;) {
4158                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4159                 struct timeval start;
4160                 double elapsed;
4161
4162                 if (!mem_ctx) {
4163                         DEBUG(DEBUG_CRIT,(__location__
4164                                           " Failed to create temp context\n"));
4165                         exit(-1);
4166                 }
4167
4168                 start = timeval_current();
4169                 main_loop(ctdb, rec, mem_ctx);
4170                 talloc_free(mem_ctx);
4171
4172                 /* we only check for recovery once every second */
4173                 elapsed = timeval_elapsed(&start);
4174                 if (elapsed < ctdb->tunable.recover_interval) {
4175                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4176                                           - elapsed);
4177                 }
4178         }
4179 }
4180
4181 /*
4182   event handler for when the main ctdbd dies
4183  */
4184 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4185                                  uint16_t flags, void *private_data)
4186 {
4187         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4188         _exit(1);
4189 }
4190
4191 /*
4192   called regularly to verify that the recovery daemon is still running
4193  */
4194 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4195                               struct timeval yt, void *p)
4196 {
4197         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4198
4199         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4200                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4201
4202                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4203                                 ctdb_restart_recd, ctdb);
4204
4205                 return;
4206         }
4207
4208         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4209                         timeval_current_ofs(30, 0),
4210                         ctdb_check_recd, ctdb);
4211 }
4212
4213 static void recd_sig_child_handler(struct event_context *ev,
4214         struct signal_event *se, int signum, int count,
4215         void *dont_care, 
4216         void *private_data)
4217 {
4218 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4219         int status;
4220         pid_t pid = -1;
4221
4222         while (pid != 0) {
4223                 pid = waitpid(-1, &status, WNOHANG);
4224                 if (pid == -1) {
4225                         if (errno != ECHILD) {
4226                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4227                         }
4228                         return;
4229                 }
4230                 if (pid > 0) {
4231                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4232                 }
4233         }
4234 }
4235
4236 /*
4237   startup the recovery daemon as a child of the main ctdb daemon
4238  */
4239 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4240 {
4241         int fd[2];
4242         struct signal_event *se;
4243         struct tevent_fd *fde;
4244
4245         if (pipe(fd) != 0) {
4246                 return -1;
4247         }
4248
4249         ctdb->ctdbd_pid = getpid();
4250
4251         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4252         if (ctdb->recoverd_pid == -1) {
4253                 return -1;
4254         }
4255
4256         if (ctdb->recoverd_pid != 0) {
4257                 talloc_free(ctdb->recd_ctx);
4258                 ctdb->recd_ctx = talloc_new(ctdb);
4259                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4260
4261                 close(fd[0]);
4262                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4263                                 timeval_current_ofs(30, 0),
4264                                 ctdb_check_recd, ctdb);
4265                 return 0;
4266         }
4267
4268         close(fd[1]);
4269
4270         srandom(getpid() ^ time(NULL));
4271
4272         /* Clear the log ringbuffer */
4273         ctdb_clear_log(ctdb);
4274
4275         ctdb_set_process_name("ctdb_recovered");
4276         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4277                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4278                 exit(1);
4279         }
4280
4281         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4282
4283         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4284                      ctdb_recoverd_parent, &fd[0]);
4285         tevent_fd_set_auto_close(fde);
4286
4287         /* set up a handler to pick up sigchld */
4288         se = event_add_signal(ctdb->ev, ctdb,
4289                                      SIGCHLD, 0,
4290                                      recd_sig_child_handler,
4291                                      ctdb);
4292         if (se == NULL) {
4293                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4294                 exit(1);
4295         }
4296
4297         monitor_cluster(ctdb);
4298
4299         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4300         return -1;
4301 }
4302
4303 /*
4304   shutdown the recovery daemon
4305  */
4306 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4307 {
4308         if (ctdb->recoverd_pid == 0) {
4309                 return;
4310         }
4311
4312         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4313         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4314
4315         TALLOC_FREE(ctdb->recd_ctx);
4316         TALLOC_FREE(ctdb->recd_ping_count);
4317 }
4318
4319 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4320                        struct timeval t, void *private_data)
4321 {
4322         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4323
4324         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4325         ctdb_stop_recoverd(ctdb);
4326         ctdb_start_recoverd(ctdb);
4327 }