87c77950de6ea33cbe3dc25d990b35f2ce7915b9
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38 #include "ctdb_logging.h"
39
40 #include "common/system.h"
41 #include "common/cmdline.h"
42 #include "common/common.h"
43
44
45 /* List of SRVID requests that need to be processed */
46 struct srvid_list {
47         struct srvid_list *next, *prev;
48         struct ctdb_srvid_message *request;
49 };
50
51 struct srvid_requests {
52         struct srvid_list *requests;
53 };
54
55 static void srvid_request_reply(struct ctdb_context *ctdb,
56                                 struct ctdb_srvid_message *request,
57                                 TDB_DATA result)
58 {
59         /* Someone that sent srvid==0 does not want a reply */
60         if (request->srvid == 0) {
61                 talloc_free(request);
62                 return;
63         }
64
65         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
66                                      result) == 0) {
67                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
68                                   (unsigned)request->pnn,
69                                   (unsigned long long)request->srvid));
70         } else {
71                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
72                                  (unsigned)request->pnn,
73                                  (unsigned long long)request->srvid));
74         }
75
76         talloc_free(request);
77 }
78
79 static void srvid_requests_reply(struct ctdb_context *ctdb,
80                                  struct srvid_requests **requests,
81                                  TDB_DATA result)
82 {
83         struct srvid_list *r;
84
85         for (r = (*requests)->requests; r != NULL; r = r->next) {
86                 srvid_request_reply(ctdb, r->request, result);
87         }
88
89         /* Free the list structure... */
90         TALLOC_FREE(*requests);
91 }
92
93 static void srvid_request_add(struct ctdb_context *ctdb,
94                               struct srvid_requests **requests,
95                               struct ctdb_srvid_message *request)
96 {
97         struct srvid_list *t;
98         int32_t ret;
99         TDB_DATA result;
100
101         if (*requests == NULL) {
102                 *requests = talloc_zero(ctdb, struct srvid_requests);
103                 if (*requests == NULL) {
104                         goto nomem;
105                 }
106         }
107
108         t = talloc_zero(*requests, struct srvid_list);
109         if (t == NULL) {
110                 /* If *requests was just allocated above then free it */
111                 if ((*requests)->requests == NULL) {
112                         TALLOC_FREE(*requests);
113                 }
114                 goto nomem;
115         }
116
117         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
118         DLIST_ADD((*requests)->requests, t);
119
120         return;
121
122 nomem:
123         /* Failed to add the request to the list.  Send a fail. */
124         DEBUG(DEBUG_ERR, (__location__
125                           " Out of memory, failed to queue SRVID request\n"));
126         ret = -ENOMEM;
127         result.dsize = sizeof(ret);
128         result.dptr = (uint8_t *)&ret;
129         srvid_request_reply(ctdb, request, result);
130 }
131
132 /* An abstraction to allow an operation (takeover runs, recoveries,
133  * ...) to be disabled for a given timeout */
134 struct ctdb_op_state {
135         struct tevent_timer *timer;
136         bool in_progress;
137         const char *name;
138 };
139
140 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
141 {
142         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
143
144         if (state != NULL) {
145                 state->in_progress = false;
146                 state->name = name;
147         }
148
149         return state;
150 }
151
152 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
153 {
154         return state->timer != NULL;
155 }
156
157 static bool ctdb_op_begin(struct ctdb_op_state *state)
158 {
159         if (ctdb_op_is_disabled(state)) {
160                 DEBUG(DEBUG_NOTICE,
161                       ("Unable to begin - %s are disabled\n", state->name));
162                 return false;
163         }
164
165         state->in_progress = true;
166         return true;
167 }
168
169 static bool ctdb_op_end(struct ctdb_op_state *state)
170 {
171         return state->in_progress = false;
172 }
173
174 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
175 {
176         return state->in_progress;
177 }
178
179 static void ctdb_op_enable(struct ctdb_op_state *state)
180 {
181         TALLOC_FREE(state->timer);
182 }
183
184 static void ctdb_op_timeout_handler(struct tevent_context *ev,
185                                     struct tevent_timer *te,
186                                     struct timeval yt, void *p)
187 {
188         struct ctdb_op_state *state =
189                 talloc_get_type(p, struct ctdb_op_state);
190
191         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
192         ctdb_op_enable(state);
193 }
194
195 static int ctdb_op_disable(struct ctdb_op_state *state,
196                            struct tevent_context *ev,
197                            uint32_t timeout)
198 {
199         if (timeout == 0) {
200                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
201                 ctdb_op_enable(state);
202                 return 0;
203         }
204
205         if (state->in_progress) {
206                 DEBUG(DEBUG_ERR,
207                       ("Unable to disable %s - in progress\n", state->name));
208                 return -EAGAIN;
209         }
210
211         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
212                             state->name, timeout));
213
214         /* Clear any old timers */
215         talloc_free(state->timer);
216
217         /* Arrange for the timeout to occur */
218         state->timer = tevent_add_timer(ev, state,
219                                         timeval_current_ofs(timeout, 0),
220                                         ctdb_op_timeout_handler, state);
221         if (state->timer == NULL) {
222                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
223                 return -ENOMEM;
224         }
225
226         return 0;
227 }
228
229 struct ctdb_banning_state {
230         uint32_t count;
231         struct timeval last_reported_time;
232 };
233
234 /*
235   private state of recovery daemon
236  */
237 struct ctdb_recoverd {
238         struct ctdb_context *ctdb;
239         uint32_t recmaster;
240         uint32_t last_culprit_node;
241         struct ctdb_node_map_old *nodemap;
242         struct timeval priority_time;
243         bool need_takeover_run;
244         bool need_recovery;
245         uint32_t node_flags;
246         struct tevent_timer *send_election_te;
247         struct tevent_timer *election_timeout;
248         struct srvid_requests *reallocate_requests;
249         struct ctdb_op_state *takeover_run;
250         struct ctdb_op_state *recovery;
251         struct ctdb_iface_list_old *ifaces;
252         uint32_t *force_rebalance_nodes;
253         struct ctdb_node_capabilities *caps;
254 };
255
256 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
257 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
258
259 static void ctdb_restart_recd(struct tevent_context *ev,
260                               struct tevent_timer *te, struct timeval t,
261                               void *private_data);
262
263 /*
264   ban a node for a period of time
265  */
266 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
267 {
268         int ret;
269         struct ctdb_context *ctdb = rec->ctdb;
270         struct ctdb_ban_state bantime;
271
272         if (!ctdb_validate_pnn(ctdb, pnn)) {
273                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
274                 return;
275         }
276
277         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
278
279         bantime.pnn  = pnn;
280         bantime.time = ban_time;
281
282         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
283         if (ret != 0) {
284                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
285                 return;
286         }
287
288 }
289
290 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
291
292
293 /*
294   remember the trouble maker
295  */
296 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
297 {
298         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
299         struct ctdb_banning_state *ban_state;
300
301         if (culprit > ctdb->num_nodes) {
302                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
303                 return;
304         }
305
306         /* If we are banned or stopped, do not set other nodes as culprits */
307         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
308                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
309                 return;
310         }
311
312         if (ctdb->nodes[culprit]->ban_state == NULL) {
313                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
314                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
315
316                 
317         }
318         ban_state = ctdb->nodes[culprit]->ban_state;
319         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
320                 /* this was the first time in a long while this node
321                    misbehaved so we will forgive any old transgressions.
322                 */
323                 ban_state->count = 0;
324         }
325
326         ban_state->count += count;
327         ban_state->last_reported_time = timeval_current();
328         rec->last_culprit_node = culprit;
329 }
330
331 /*
332   remember the trouble maker
333  */
334 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
335 {
336         ctdb_set_culprit_count(rec, culprit, 1);
337 }
338
339
340 /* this callback is called for every node that failed to execute the
341    recovered event
342 */
343 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
344 {
345         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
346
347         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
348
349         ctdb_set_culprit(rec, node_pnn);
350 }
351
352 /*
353   run the "recovered" eventscript on all nodes
354  */
355 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
356 {
357         TALLOC_CTX *tmp_ctx;
358         uint32_t *nodes;
359         struct ctdb_context *ctdb = rec->ctdb;
360
361         tmp_ctx = talloc_new(ctdb);
362         CTDB_NO_MEMORY(ctdb, tmp_ctx);
363
364         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
365         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
366                                         nodes, 0,
367                                         CONTROL_TIMEOUT(), false, tdb_null,
368                                         NULL, recovered_fail_callback,
369                                         rec) != 0) {
370                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
371
372                 talloc_free(tmp_ctx);
373                 return -1;
374         }
375
376         talloc_free(tmp_ctx);
377         return 0;
378 }
379
380 /* this callback is called for every node that failed to execute the
381    start recovery event
382 */
383 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
384 {
385         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
386
387         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
388
389         ctdb_set_culprit(rec, node_pnn);
390 }
391
392 /*
393   run the "startrecovery" eventscript on all nodes
394  */
395 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
396 {
397         TALLOC_CTX *tmp_ctx;
398         uint32_t *nodes;
399         struct ctdb_context *ctdb = rec->ctdb;
400
401         tmp_ctx = talloc_new(ctdb);
402         CTDB_NO_MEMORY(ctdb, tmp_ctx);
403
404         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
406                                         nodes, 0,
407                                         CONTROL_TIMEOUT(), false, tdb_null,
408                                         NULL,
409                                         startrecovery_fail_callback,
410                                         rec) != 0) {
411                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
412                 talloc_free(tmp_ctx);
413                 return -1;
414         }
415
416         talloc_free(tmp_ctx);
417         return 0;
418 }
419
420 /*
421   update the node capabilities for all connected nodes
422  */
423 static int update_capabilities(struct ctdb_recoverd *rec,
424                                struct ctdb_node_map_old *nodemap)
425 {
426         uint32_t *capp;
427         TALLOC_CTX *tmp_ctx;
428         struct ctdb_node_capabilities *caps;
429         struct ctdb_context *ctdb = rec->ctdb;
430
431         tmp_ctx = talloc_new(rec);
432         CTDB_NO_MEMORY(ctdb, tmp_ctx);
433
434         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
435                                      CONTROL_TIMEOUT(), nodemap);
436
437         if (caps == NULL) {
438                 DEBUG(DEBUG_ERR,
439                       (__location__ " Failed to get node capabilities\n"));
440                 talloc_free(tmp_ctx);
441                 return -1;
442         }
443
444         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
445         if (capp == NULL) {
446                 DEBUG(DEBUG_ERR,
447                       (__location__
448                        " Capabilities don't include current node.\n"));
449                 talloc_free(tmp_ctx);
450                 return -1;
451         }
452         ctdb->capabilities = *capp;
453
454         TALLOC_FREE(rec->caps);
455         rec->caps = talloc_steal(rec, caps);
456
457         talloc_free(tmp_ctx);
458         return 0;
459 }
460
461 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
462 {
463         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
464
465         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
466         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
467 }
468
469 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
470 {
471         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
472
473         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
474         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
475 }
476
477 /*
478   change recovery mode on all nodes
479  */
480 static int set_recovery_mode(struct ctdb_context *ctdb,
481                              struct ctdb_recoverd *rec,
482                              struct ctdb_node_map_old *nodemap,
483                              uint32_t rec_mode, bool freeze)
484 {
485         TDB_DATA data;
486         uint32_t *nodes;
487         TALLOC_CTX *tmp_ctx;
488
489         tmp_ctx = talloc_new(ctdb);
490         CTDB_NO_MEMORY(ctdb, tmp_ctx);
491
492         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
493
494         data.dsize = sizeof(uint32_t);
495         data.dptr = (unsigned char *)&rec_mode;
496
497         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
498                                         nodes, 0,
499                                         CONTROL_TIMEOUT(),
500                                         false, data,
501                                         NULL, NULL,
502                                         NULL) != 0) {
503                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
504                 talloc_free(tmp_ctx);
505                 return -1;
506         }
507
508         /* freeze all nodes */
509         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
510                 int i;
511
512                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
513                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
514                                                 nodes, i,
515                                                 CONTROL_TIMEOUT(),
516                                                 false, tdb_null,
517                                                 NULL,
518                                                 set_recmode_fail_callback,
519                                                 rec) != 0) {
520                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
521                                 talloc_free(tmp_ctx);
522                                 return -1;
523                         }
524                 }
525         }
526
527         talloc_free(tmp_ctx);
528         return 0;
529 }
530
531 /*
532   change recovery master on all node
533  */
534 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn)
535 {
536         TDB_DATA data;
537         TALLOC_CTX *tmp_ctx;
538         uint32_t *nodes;
539
540         tmp_ctx = talloc_new(ctdb);
541         CTDB_NO_MEMORY(ctdb, tmp_ctx);
542
543         data.dsize = sizeof(uint32_t);
544         data.dptr = (unsigned char *)&pnn;
545
546         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
547         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
548                                         nodes, 0,
549                                         CONTROL_TIMEOUT(), false, data,
550                                         NULL, NULL,
551                                         NULL) != 0) {
552                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
553                 talloc_free(tmp_ctx);
554                 return -1;
555         }
556
557         talloc_free(tmp_ctx);
558         return 0;
559 }
560
561 /* update all remote nodes to use the same db priority that we have
562    this can fail if the remove node has not yet been upgraded to 
563    support this function, so we always return success and never fail
564    a recovery if this call fails.
565 */
566 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
567         struct ctdb_node_map_old *nodemap, 
568         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
569 {
570         int db;
571
572         /* step through all local databases */
573         for (db=0; db<dbmap->num;db++) {
574                 struct ctdb_db_priority db_prio;
575                 int ret;
576
577                 db_prio.db_id     = dbmap->dbs[db].db_id;
578                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
579                 if (ret != 0) {
580                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
581                         continue;
582                 }
583
584                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
585
586                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
587                                                 CTDB_CURRENT_NODE, &db_prio);
588                 if (ret != 0) {
589                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
590                                          db_prio.db_id));
591                 }
592         }
593
594         return 0;
595 }                       
596
597 /*
598   ensure all other nodes have attached to any databases that we have
599  */
600 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
601                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
602 {
603         int i, j, db, ret;
604         struct ctdb_dbid_map_old *remote_dbmap;
605
606         /* verify that all other nodes have all our databases */
607         for (j=0; j<nodemap->num; j++) {
608                 /* we don't need to ourself ourselves */
609                 if (nodemap->nodes[j].pnn == pnn) {
610                         continue;
611                 }
612                 /* don't check nodes that are unavailable */
613                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
614                         continue;
615                 }
616
617                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
618                                          mem_ctx, &remote_dbmap);
619                 if (ret != 0) {
620                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
621                         return -1;
622                 }
623
624                 /* step through all local databases */
625                 for (db=0; db<dbmap->num;db++) {
626                         const char *name;
627
628
629                         for (i=0;i<remote_dbmap->num;i++) {
630                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
631                                         break;
632                                 }
633                         }
634                         /* the remote node already have this database */
635                         if (i!=remote_dbmap->num) {
636                                 continue;
637                         }
638                         /* ok so we need to create this database */
639                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
640                                                   dbmap->dbs[db].db_id, mem_ctx,
641                                                   &name);
642                         if (ret != 0) {
643                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
644                                 return -1;
645                         }
646                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
647                                                  nodemap->nodes[j].pnn,
648                                                  mem_ctx, name,
649                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
650                         if (ret != 0) {
651                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
652                                 return -1;
653                         }
654                 }
655         }
656
657         return 0;
658 }
659
660
661 /*
662   ensure we are attached to any databases that anyone else is attached to
663  */
664 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
665                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
666 {
667         int i, j, db, ret;
668         struct ctdb_dbid_map_old *remote_dbmap;
669
670         /* verify that we have all database any other node has */
671         for (j=0; j<nodemap->num; j++) {
672                 /* we don't need to ourself ourselves */
673                 if (nodemap->nodes[j].pnn == pnn) {
674                         continue;
675                 }
676                 /* don't check nodes that are unavailable */
677                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
678                         continue;
679                 }
680
681                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
682                                          mem_ctx, &remote_dbmap);
683                 if (ret != 0) {
684                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
685                         return -1;
686                 }
687
688                 /* step through all databases on the remote node */
689                 for (db=0; db<remote_dbmap->num;db++) {
690                         const char *name;
691
692                         for (i=0;i<(*dbmap)->num;i++) {
693                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
694                                         break;
695                                 }
696                         }
697                         /* we already have this db locally */
698                         if (i!=(*dbmap)->num) {
699                                 continue;
700                         }
701                         /* ok so we need to create this database and
702                            rebuild dbmap
703                          */
704                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
705                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
706                         if (ret != 0) {
707                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
708                                           nodemap->nodes[j].pnn));
709                                 return -1;
710                         }
711                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
712                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
713                         if (ret != 0) {
714                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
715                                 return -1;
716                         }
717                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
718                         if (ret != 0) {
719                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
720                                 return -1;
721                         }
722                 }
723         }
724
725         return 0;
726 }
727
728
729 /*
730   pull the remote database contents from one node into the recdb
731  */
732 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
733                                     struct tdb_wrap *recdb, uint32_t dbid)
734 {
735         int ret;
736         TDB_DATA outdata;
737         struct ctdb_marshall_buffer *reply;
738         struct ctdb_rec_data_old *recdata;
739         int i;
740         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
741
742         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
743                                CONTROL_TIMEOUT(), &outdata);
744         if (ret != 0) {
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
746                 talloc_free(tmp_ctx);
747                 return -1;
748         }
749
750         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
751
752         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
753                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
754                 talloc_free(tmp_ctx);
755                 return -1;
756         }
757
758         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
759
760         for (i=0;
761              i<reply->count;
762              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
763                 TDB_DATA key, data;
764                 struct ctdb_ltdb_header *hdr;
765                 TDB_DATA existing;
766
767                 key.dptr = &recdata->data[0];
768                 key.dsize = recdata->keylen;
769                 data.dptr = &recdata->data[key.dsize];
770                 data.dsize = recdata->datalen;
771
772                 hdr = (struct ctdb_ltdb_header *)data.dptr;
773
774                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
775                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
776                         talloc_free(tmp_ctx);
777                         return -1;
778                 }
779
780                 /* fetch the existing record, if any */
781                 existing = tdb_fetch(recdb->tdb, key);
782
783                 if (existing.dptr != NULL) {
784                         struct ctdb_ltdb_header header;
785                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
786                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
787                                          (unsigned)existing.dsize, srcnode));
788                                 free(existing.dptr);
789                                 talloc_free(tmp_ctx);
790                                 return -1;
791                         }
792                         header = *(struct ctdb_ltdb_header *)existing.dptr;
793                         free(existing.dptr);
794                         if (!(header.rsn < hdr->rsn ||
795                               (header.dmaster != ctdb_get_pnn(ctdb) &&
796                                header.rsn == hdr->rsn))) {
797                                 continue;
798                         }
799                 }
800
801                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
802                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
803                         talloc_free(tmp_ctx);
804                         return -1;
805                 }
806         }
807
808         talloc_free(tmp_ctx);
809
810         return 0;
811 }
812
813
814 struct pull_seqnum_cbdata {
815         int failed;
816         uint32_t pnn;
817         uint64_t seqnum;
818 };
819
820 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
821 {
822         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
823         uint64_t seqnum;
824
825         if (cb_data->failed != 0) {
826                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
827                 return;
828         }
829
830         if (res != 0) {
831                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
832                 cb_data->failed = 1;
833                 return;
834         }
835
836         if (outdata.dsize != sizeof(uint64_t)) {
837                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
838                 cb_data->failed = -1;
839                 return;
840         }
841
842         seqnum = *((uint64_t *)outdata.dptr);
843
844         if (seqnum > cb_data->seqnum ||
845             (cb_data->pnn == -1 && seqnum == 0)) {
846                 cb_data->seqnum = seqnum;
847                 cb_data->pnn = node_pnn;
848         }
849 }
850
851 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
852 {
853         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
854
855         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
856         cb_data->failed = 1;
857 }
858
859 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
860                                 struct ctdb_recoverd *rec, 
861                                 struct ctdb_node_map_old *nodemap, 
862                                 struct tdb_wrap *recdb, uint32_t dbid)
863 {
864         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
865         uint32_t *nodes;
866         TDB_DATA data;
867         uint32_t outdata[2];
868         struct pull_seqnum_cbdata *cb_data;
869
870         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
871
872         outdata[0] = dbid;
873         outdata[1] = 0;
874
875         data.dsize = sizeof(outdata);
876         data.dptr  = (uint8_t *)&outdata[0];
877
878         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
879         if (cb_data == NULL) {
880                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
881                 talloc_free(tmp_ctx);
882                 return -1;
883         }
884
885         cb_data->failed = 0;
886         cb_data->pnn    = -1;
887         cb_data->seqnum = 0;
888         
889         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
890         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
891                                         nodes, 0,
892                                         CONTROL_TIMEOUT(), false, data,
893                                         pull_seqnum_cb,
894                                         pull_seqnum_fail_cb,
895                                         cb_data) != 0) {
896                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
897
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (cb_data->failed != 0) {
903                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
904                 talloc_free(tmp_ctx);
905                 return -1;
906         }
907
908         if (cb_data->pnn == -1) {
909                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
910                 talloc_free(tmp_ctx);
911                 return -1;
912         }
913
914         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
915
916         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
917                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
918                 talloc_free(tmp_ctx);
919                 return -1;
920         }
921
922         talloc_free(tmp_ctx);
923         return 0;
924 }
925
926
927 /*
928   pull all the remote database contents into the recdb
929  */
930 static int pull_remote_database(struct ctdb_context *ctdb,
931                                 struct ctdb_recoverd *rec, 
932                                 struct ctdb_node_map_old *nodemap, 
933                                 struct tdb_wrap *recdb, uint32_t dbid,
934                                 bool persistent)
935 {
936         int j;
937
938         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
939                 int ret;
940                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
941                 if (ret == 0) {
942                         return 0;
943                 }
944         }
945
946         /* pull all records from all other nodes across onto this node
947            (this merges based on rsn)
948         */
949         for (j=0; j<nodemap->num; j++) {
950                 /* don't merge from nodes that are unavailable */
951                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
952                         continue;
953                 }
954                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
955                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
956                                  nodemap->nodes[j].pnn));
957                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
958                         return -1;
959                 }
960         }
961         
962         return 0;
963 }
964
965
966 /*
967   update flags on all active nodes
968  */
969 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
970 {
971         int ret;
972
973         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
974                 if (ret != 0) {
975                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
976                 return -1;
977         }
978
979         return 0;
980 }
981
982 /*
983   ensure all nodes have the same vnnmap we do
984  */
985 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
986                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
987 {
988         int j, ret;
989
990         /* push the new vnn map out to all the nodes */
991         for (j=0; j<nodemap->num; j++) {
992                 /* don't push to nodes that are unavailable */
993                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
994                         continue;
995                 }
996
997                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
998                 if (ret != 0) {
999                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1000                         return -1;
1001                 }
1002         }
1003
1004         return 0;
1005 }
1006
1007
1008 /*
1009   called when a vacuum fetch has completed - just free it and do the next one
1010  */
1011 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
1012 {
1013         talloc_free(state);
1014 }
1015
1016
1017 /**
1018  * Process one elements of the vacuum fetch list:
1019  * Migrate it over to us with the special flag
1020  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
1021  */
1022 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
1023                                      uint32_t pnn,
1024                                      struct ctdb_rec_data_old *r)
1025 {
1026         struct ctdb_client_call_state *state;
1027         TDB_DATA data;
1028         struct ctdb_ltdb_header *hdr;
1029         struct ctdb_call call;
1030
1031         ZERO_STRUCT(call);
1032         call.call_id = CTDB_NULL_FUNC;
1033         call.flags = CTDB_IMMEDIATE_MIGRATION;
1034         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1035
1036         call.key.dptr = &r->data[0];
1037         call.key.dsize = r->keylen;
1038
1039         /* ensure we don't block this daemon - just skip a record if we can't get
1040            the chainlock */
1041         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1042                 return true;
1043         }
1044
1045         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1046         if (data.dptr == NULL) {
1047                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1048                 return true;
1049         }
1050
1051         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1052                 free(data.dptr);
1053                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1054                 return true;
1055         }
1056
1057         hdr = (struct ctdb_ltdb_header *)data.dptr;
1058         if (hdr->dmaster == pnn) {
1059                 /* its already local */
1060                 free(data.dptr);
1061                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1062                 return true;
1063         }
1064
1065         free(data.dptr);
1066
1067         state = ctdb_call_send(ctdb_db, &call);
1068         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1069         if (state == NULL) {
1070                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1071                 return false;
1072         }
1073         state->async.fn = vacuum_fetch_callback;
1074         state->async.private_data = NULL;
1075
1076         return true;
1077 }
1078
1079
1080 /*
1081   handler for vacuum fetch
1082 */
1083 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1084                                  void *private_data)
1085 {
1086         struct ctdb_recoverd *rec = talloc_get_type(
1087                 private_data, struct ctdb_recoverd);
1088         struct ctdb_context *ctdb = rec->ctdb;
1089         struct ctdb_marshall_buffer *recs;
1090         int ret, i;
1091         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1092         const char *name;
1093         struct ctdb_dbid_map_old *dbmap=NULL;
1094         bool persistent = false;
1095         struct ctdb_db_context *ctdb_db;
1096         struct ctdb_rec_data_old *r;
1097
1098         recs = (struct ctdb_marshall_buffer *)data.dptr;
1099
1100         if (recs->count == 0) {
1101                 goto done;
1102         }
1103
1104         /* work out if the database is persistent */
1105         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1106         if (ret != 0) {
1107                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1108                 goto done;
1109         }
1110
1111         for (i=0;i<dbmap->num;i++) {
1112                 if (dbmap->dbs[i].db_id == recs->db_id) {
1113                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1114                         break;
1115                 }
1116         }
1117         if (i == dbmap->num) {
1118                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1119                 goto done;
1120         }
1121
1122         /* find the name of this database */
1123         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1125                 goto done;
1126         }
1127
1128         /* attach to it */
1129         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1130         if (ctdb_db == NULL) {
1131                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1132                 goto done;
1133         }
1134
1135         r = (struct ctdb_rec_data_old *)&recs->data[0];
1136         while (recs->count) {
1137                 bool ok;
1138
1139                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1140                 if (!ok) {
1141                         break;
1142                 }
1143
1144                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1145                 recs->count--;
1146         }
1147
1148 done:
1149         talloc_free(tmp_ctx);
1150 }
1151
1152
1153 /*
1154  * handler for database detach
1155  */
1156 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1157                                     void *private_data)
1158 {
1159         struct ctdb_recoverd *rec = talloc_get_type(
1160                 private_data, struct ctdb_recoverd);
1161         struct ctdb_context *ctdb = rec->ctdb;
1162         uint32_t db_id;
1163         struct ctdb_db_context *ctdb_db;
1164
1165         if (data.dsize != sizeof(db_id)) {
1166                 return;
1167         }
1168         db_id = *(uint32_t *)data.dptr;
1169
1170         ctdb_db = find_ctdb_db(ctdb, db_id);
1171         if (ctdb_db == NULL) {
1172                 /* database is not attached */
1173                 return;
1174         }
1175
1176         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1177
1178         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1179                              ctdb_db->db_name));
1180         talloc_free(ctdb_db);
1181 }
1182
1183 /*
1184   called when ctdb_wait_timeout should finish
1185  */
1186 static void ctdb_wait_handler(struct tevent_context *ev,
1187                               struct tevent_timer *te,
1188                               struct timeval yt, void *p)
1189 {
1190         uint32_t *timed_out = (uint32_t *)p;
1191         (*timed_out) = 1;
1192 }
1193
1194 /*
1195   wait for a given number of seconds
1196  */
1197 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1198 {
1199         uint32_t timed_out = 0;
1200         time_t usecs = (secs - (time_t)secs) * 1000000;
1201         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1202                          ctdb_wait_handler, &timed_out);
1203         while (!timed_out) {
1204                 tevent_loop_once(ctdb->ev);
1205         }
1206 }
1207
1208 /*
1209   called when an election times out (ends)
1210  */
1211 static void ctdb_election_timeout(struct tevent_context *ev,
1212                                   struct tevent_timer *te,
1213                                   struct timeval t, void *p)
1214 {
1215         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1216         rec->election_timeout = NULL;
1217         fast_start = false;
1218
1219         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1220 }
1221
1222
1223 /*
1224   wait for an election to finish. It finished election_timeout seconds after
1225   the last election packet is received
1226  */
1227 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1228 {
1229         struct ctdb_context *ctdb = rec->ctdb;
1230         while (rec->election_timeout) {
1231                 tevent_loop_once(ctdb->ev);
1232         }
1233 }
1234
1235 /*
1236   Update our local flags from all remote connected nodes. 
1237   This is only run when we are or we belive we are the recovery master
1238  */
1239 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1240 {
1241         int j;
1242         struct ctdb_context *ctdb = rec->ctdb;
1243         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1244
1245         /* get the nodemap for all active remote nodes and verify
1246            they are the same as for this node
1247          */
1248         for (j=0; j<nodemap->num; j++) {
1249                 struct ctdb_node_map_old *remote_nodemap=NULL;
1250                 int ret;
1251
1252                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1253                         continue;
1254                 }
1255                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1256                         continue;
1257                 }
1258
1259                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1260                                            mem_ctx, &remote_nodemap);
1261                 if (ret != 0) {
1262                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1263                                   nodemap->nodes[j].pnn));
1264                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1265                         talloc_free(mem_ctx);
1266                         return MONITOR_FAILED;
1267                 }
1268                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1269                         /* We should tell our daemon about this so it
1270                            updates its flags or else we will log the same 
1271                            message again in the next iteration of recovery.
1272                            Since we are the recovery master we can just as
1273                            well update the flags on all nodes.
1274                         */
1275                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1276                         if (ret != 0) {
1277                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1278                                 return -1;
1279                         }
1280
1281                         /* Update our local copy of the flags in the recovery
1282                            daemon.
1283                         */
1284                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1285                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1286                                  nodemap->nodes[j].flags));
1287                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1288                 }
1289                 talloc_free(remote_nodemap);
1290         }
1291         talloc_free(mem_ctx);
1292         return MONITOR_OK;
1293 }
1294
1295
1296 /* Create a new random generation id.
1297    The generation id can not be the INVALID_GENERATION id
1298 */
1299 static uint32_t new_generation(void)
1300 {
1301         uint32_t generation;
1302
1303         while (1) {
1304                 generation = random();
1305
1306                 if (generation != INVALID_GENERATION) {
1307                         break;
1308                 }
1309         }
1310
1311         return generation;
1312 }
1313
1314
1315 /*
1316   create a temporary working database
1317  */
1318 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1319 {
1320         char *name;
1321         struct tdb_wrap *recdb;
1322         unsigned tdb_flags;
1323
1324         /* open up the temporary recovery database */
1325         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1326                                ctdb->db_directory_state,
1327                                ctdb->pnn);
1328         if (name == NULL) {
1329                 return NULL;
1330         }
1331         unlink(name);
1332
1333         tdb_flags = TDB_NOLOCK;
1334         if (ctdb->valgrinding) {
1335                 tdb_flags |= TDB_NOMMAP;
1336         }
1337         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1338
1339         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1340                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1341         if (recdb == NULL) {
1342                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1343         }
1344
1345         talloc_free(name);
1346
1347         return recdb;
1348 }
1349
1350
1351 /* 
1352    a traverse function for pulling all relevant records from recdb
1353  */
1354 struct recdb_data {
1355         struct ctdb_context *ctdb;
1356         struct ctdb_marshall_buffer *recdata;
1357         uint32_t len;
1358         uint32_t allocated_len;
1359         bool failed;
1360         bool persistent;
1361 };
1362
1363 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1364 {
1365         struct recdb_data *params = (struct recdb_data *)p;
1366         struct ctdb_rec_data_old *recdata;
1367         struct ctdb_ltdb_header *hdr;
1368
1369         /*
1370          * skip empty records - but NOT for persistent databases:
1371          *
1372          * The record-by-record mode of recovery deletes empty records.
1373          * For persistent databases, this can lead to data corruption
1374          * by deleting records that should be there:
1375          *
1376          * - Assume the cluster has been running for a while.
1377          *
1378          * - A record R in a persistent database has been created and
1379          *   deleted a couple of times, the last operation being deletion,
1380          *   leaving an empty record with a high RSN, say 10.
1381          *
1382          * - Now a node N is turned off.
1383          *
1384          * - This leaves the local database copy of D on N with the empty
1385          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1386          *   the copy of record R.
1387          *
1388          * - Now the record is created again while node N is turned off.
1389          *   This creates R with RSN = 1 on all nodes except for N.
1390          *
1391          * - Now node N is turned on again. The following recovery will chose
1392          *   the older empty copy of R due to RSN 10 > RSN 1.
1393          *
1394          * ==> Hence the record is gone after the recovery.
1395          *
1396          * On databases like Samba's registry, this can damage the higher-level
1397          * data structures built from the various tdb-level records.
1398          */
1399         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1400                 return 0;
1401         }
1402
1403         /* update the dmaster field to point to us */
1404         hdr = (struct ctdb_ltdb_header *)data.dptr;
1405         if (!params->persistent) {
1406                 hdr->dmaster = params->ctdb->pnn;
1407                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1408         }
1409
1410         /* add the record to the blob ready to send to the nodes */
1411         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1412         if (recdata == NULL) {
1413                 params->failed = true;
1414                 return -1;
1415         }
1416         if (params->len + recdata->length >= params->allocated_len) {
1417                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1418                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1419         }
1420         if (params->recdata == NULL) {
1421                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1422                          recdata->length + params->len));
1423                 params->failed = true;
1424                 return -1;
1425         }
1426         params->recdata->count++;
1427         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1428         params->len += recdata->length;
1429         talloc_free(recdata);
1430
1431         return 0;
1432 }
1433
1434 /*
1435   push the recdb database out to all nodes
1436  */
1437 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1438                                bool persistent,
1439                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1440 {
1441         struct recdb_data params;
1442         struct ctdb_marshall_buffer *recdata;
1443         TDB_DATA outdata;
1444         TALLOC_CTX *tmp_ctx;
1445         uint32_t *nodes;
1446
1447         tmp_ctx = talloc_new(ctdb);
1448         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1449
1450         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1451         CTDB_NO_MEMORY(ctdb, recdata);
1452
1453         recdata->db_id = dbid;
1454
1455         params.ctdb = ctdb;
1456         params.recdata = recdata;
1457         params.len = offsetof(struct ctdb_marshall_buffer, data);
1458         params.allocated_len = params.len;
1459         params.failed = false;
1460         params.persistent = persistent;
1461
1462         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1463                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1464                 talloc_free(params.recdata);
1465                 talloc_free(tmp_ctx);
1466                 return -1;
1467         }
1468
1469         if (params.failed) {
1470                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1471                 talloc_free(params.recdata);
1472                 talloc_free(tmp_ctx);
1473                 return -1;              
1474         }
1475
1476         recdata = params.recdata;
1477
1478         outdata.dptr = (void *)recdata;
1479         outdata.dsize = params.len;
1480
1481         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1482         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1483                                         nodes, 0,
1484                                         CONTROL_TIMEOUT(), false, outdata,
1485                                         NULL, NULL,
1486                                         NULL) != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1488                 talloc_free(recdata);
1489                 talloc_free(tmp_ctx);
1490                 return -1;
1491         }
1492
1493         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1494                   dbid, recdata->count));
1495
1496         talloc_free(recdata);
1497         talloc_free(tmp_ctx);
1498
1499         return 0;
1500 }
1501
1502
1503 /*
1504   go through a full recovery on one database 
1505  */
1506 static int recover_database(struct ctdb_recoverd *rec, 
1507                             TALLOC_CTX *mem_ctx,
1508                             uint32_t dbid,
1509                             bool persistent,
1510                             uint32_t pnn, 
1511                             struct ctdb_node_map_old *nodemap,
1512                             uint32_t transaction_id)
1513 {
1514         struct tdb_wrap *recdb;
1515         int ret;
1516         struct ctdb_context *ctdb = rec->ctdb;
1517         TDB_DATA data;
1518         struct ctdb_transdb w;
1519         uint32_t *nodes;
1520
1521         recdb = create_recdb(ctdb, mem_ctx);
1522         if (recdb == NULL) {
1523                 return -1;
1524         }
1525
1526         /* pull all remote databases onto the recdb */
1527         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1528         if (ret != 0) {
1529                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1530                 return -1;
1531         }
1532
1533         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1534
1535         /* wipe all the remote databases. This is safe as we are in a transaction */
1536         w.db_id = dbid;
1537         w.tid = transaction_id;
1538
1539         data.dptr = (void *)&w;
1540         data.dsize = sizeof(w);
1541
1542         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1543         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1544                                         nodes, 0,
1545                                         CONTROL_TIMEOUT(), false, data,
1546                                         NULL, NULL,
1547                                         NULL) != 0) {
1548                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1549                 talloc_free(recdb);
1550                 return -1;
1551         }
1552         
1553         /* push out the correct database. This sets the dmaster and skips 
1554            the empty records */
1555         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1556         if (ret != 0) {
1557                 talloc_free(recdb);
1558                 return -1;
1559         }
1560
1561         /* all done with this database */
1562         talloc_free(recdb);
1563
1564         return 0;
1565 }
1566
1567 /* when we start a recovery, make sure all nodes use the same reclock file
1568    setting
1569 */
1570 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1571 {
1572         struct ctdb_context *ctdb = rec->ctdb;
1573         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1574         TDB_DATA data;
1575         uint32_t *nodes;
1576
1577         if (ctdb->recovery_lock_file == NULL) {
1578                 data.dptr  = NULL;
1579                 data.dsize = 0;
1580         } else {
1581                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1582                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1583         }
1584
1585         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1586         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1587                                         nodes, 0,
1588                                         CONTROL_TIMEOUT(),
1589                                         false, data,
1590                                         NULL, NULL,
1591                                         rec) != 0) {
1592                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1593                 talloc_free(tmp_ctx);
1594                 return -1;
1595         }
1596
1597         talloc_free(tmp_ctx);
1598         return 0;
1599 }
1600
1601
1602 /*
1603  * this callback is called for every node that failed to execute ctdb_takeover_run()
1604  * and set flag to re-run takeover run.
1605  */
1606 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1607 {
1608         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1609
1610         if (callback_data != NULL) {
1611                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1612
1613                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1614
1615                 ctdb_set_culprit(rec, node_pnn);
1616         }
1617 }
1618
1619
1620 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1621 {
1622         struct ctdb_context *ctdb = rec->ctdb;
1623         int i;
1624         struct ctdb_banning_state *ban_state;
1625
1626         *self_ban = false;
1627         for (i=0; i<ctdb->num_nodes; i++) {
1628                 if (ctdb->nodes[i]->ban_state == NULL) {
1629                         continue;
1630                 }
1631                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1632                 if (ban_state->count < 2*ctdb->num_nodes) {
1633                         continue;
1634                 }
1635
1636                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1637                         ctdb->nodes[i]->pnn, ban_state->count,
1638                         ctdb->tunable.recovery_ban_period));
1639                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1640                 ban_state->count = 0;
1641
1642                 /* Banning ourself? */
1643                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1644                         *self_ban = true;
1645                 }
1646         }
1647 }
1648
1649 static bool do_takeover_run(struct ctdb_recoverd *rec,
1650                             struct ctdb_node_map_old *nodemap,
1651                             bool banning_credits_on_fail)
1652 {
1653         uint32_t *nodes = NULL;
1654         struct ctdb_disable_message dtr;
1655         TDB_DATA data;
1656         int i;
1657         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1658         int ret;
1659         bool ok;
1660
1661         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1662
1663         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1664                 DEBUG(DEBUG_ERR, (__location__
1665                                   " takeover run already in progress \n"));
1666                 ok = false;
1667                 goto done;
1668         }
1669
1670         if (!ctdb_op_begin(rec->takeover_run)) {
1671                 ok = false;
1672                 goto done;
1673         }
1674
1675         /* Disable IP checks (takeover runs, really) on other nodes
1676          * while doing this takeover run.  This will stop those other
1677          * nodes from triggering takeover runs when think they should
1678          * be hosting an IP but it isn't yet on an interface.  Don't
1679          * wait for replies since a failure here might cause some
1680          * noise in the logs but will not actually cause a problem.
1681          */
1682         dtr.srvid = 0; /* No reply */
1683         dtr.pnn = -1;
1684
1685         data.dptr  = (uint8_t*)&dtr;
1686         data.dsize = sizeof(dtr);
1687
1688         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1689
1690         /* Disable for 60 seconds.  This can be a tunable later if
1691          * necessary.
1692          */
1693         dtr.timeout = 60;
1694         for (i = 0; i < talloc_array_length(nodes); i++) {
1695                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1696                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1697                                              data) != 0) {
1698                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1699                 }
1700         }
1701
1702         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1703                                 rec->force_rebalance_nodes,
1704                                 takeover_fail_callback,
1705                                 banning_credits_on_fail ? rec : NULL);
1706
1707         /* Reenable takeover runs and IP checks on other nodes */
1708         dtr.timeout = 0;
1709         for (i = 0; i < talloc_array_length(nodes); i++) {
1710                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1711                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1712                                              data) != 0) {
1713                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1714                 }
1715         }
1716
1717         if (ret != 0) {
1718                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1719                 ok = false;
1720                 goto done;
1721         }
1722
1723         ok = true;
1724         /* Takeover run was successful so clear force rebalance targets */
1725         if (rebalance_nodes == rec->force_rebalance_nodes) {
1726                 TALLOC_FREE(rec->force_rebalance_nodes);
1727         } else {
1728                 DEBUG(DEBUG_WARNING,
1729                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1730         }
1731 done:
1732         rec->need_takeover_run = !ok;
1733         talloc_free(nodes);
1734         ctdb_op_end(rec->takeover_run);
1735
1736         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1737         return ok;
1738 }
1739
1740 struct recovery_helper_state {
1741         int fd[2];
1742         pid_t pid;
1743         int result;
1744         bool done;
1745 };
1746
1747 static void ctdb_recovery_handler(struct tevent_context *ev,
1748                                   struct tevent_fd *fde,
1749                                   uint16_t flags, void *private_data)
1750 {
1751         struct recovery_helper_state *state = talloc_get_type_abort(
1752                 private_data, struct recovery_helper_state);
1753         int ret;
1754
1755         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1756         if (ret != sizeof(state->result)) {
1757                 state->result = EPIPE;
1758         }
1759
1760         state->done = true;
1761 }
1762
1763
1764 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1765 {
1766         static char prog[PATH_MAX+1] = "";
1767         const char **args;
1768         struct recovery_helper_state *state;
1769         struct tevent_fd *fde;
1770         int nargs, ret;
1771
1772         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1773                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1774                              "ctdb_recovery_helper")) {
1775                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1776         }
1777
1778         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1779         if (state == NULL) {
1780                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1781                 return -1;
1782         }
1783
1784         state->pid = -1;
1785
1786         ret = pipe(state->fd);
1787         if (ret != 0) {
1788                 DEBUG(DEBUG_ERR,
1789                       ("Failed to create pipe for recovery helper\n"));
1790                 goto fail;
1791         }
1792
1793         set_close_on_exec(state->fd[0]);
1794
1795         nargs = 4;
1796         args = talloc_array(state, const char *, nargs);
1797         if (args == NULL) {
1798                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1799                 goto fail;
1800         }
1801
1802         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1803         args[1] = rec->ctdb->daemon.name;
1804         args[2] = talloc_asprintf(args, "%u", new_generation());
1805         args[3] = NULL;
1806
1807         if (args[0] == NULL || args[2] == NULL) {
1808                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1809                 goto fail;
1810         }
1811
1812         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1813                                      args, NULL, NULL, &state->pid)) {
1814                 DEBUG(DEBUG_ERR,
1815                       ("Failed to create child for recovery helper\n"));
1816                 goto fail;
1817         }
1818
1819         close(state->fd[1]);
1820         state->fd[1] = -1;
1821
1822         state->done = false;
1823
1824         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1825                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1826         if (fde == NULL) {
1827                 goto fail;
1828         }
1829         tevent_fd_set_auto_close(fde);
1830
1831         while (!state->done) {
1832                 tevent_loop_once(rec->ctdb->ev);
1833         }
1834
1835         close(state->fd[0]);
1836         state->fd[0] = -1;
1837
1838         if (state->result != 0) {
1839                 goto fail;
1840         }
1841
1842         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1843         talloc_free(state);
1844         return 0;
1845
1846 fail:
1847         if (state->fd[0] != -1) {
1848                 close(state->fd[0]);
1849         }
1850         if (state->fd[1] != -1) {
1851                 close(state->fd[1]);
1852         }
1853         if (state->pid != -1) {
1854                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1855         }
1856         talloc_free(state);
1857         return -1;
1858 }
1859
1860 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1861                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1862                               struct ctdb_vnn_map *vnnmap,
1863                               struct ctdb_dbid_map_old *dbmap)
1864 {
1865         struct ctdb_context *ctdb = rec->ctdb;
1866         uint32_t generation;
1867         TDB_DATA data;
1868         uint32_t *nodes;
1869         int ret, i, j;
1870
1871         /* set recovery mode to active on all nodes */
1872         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1873         if (ret != 0) {
1874                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1875                 return -1;
1876         }
1877
1878         /* execute the "startrecovery" event script on all nodes */
1879         ret = run_startrecovery_eventscript(rec, nodemap);
1880         if (ret!=0) {
1881                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1882                 return -1;
1883         }
1884
1885         /* pick a new generation number */
1886         generation = new_generation();
1887
1888         /* change the vnnmap on this node to use the new generation 
1889            number but not on any other nodes.
1890            this guarantees that if we abort the recovery prematurely
1891            for some reason (a node stops responding?)
1892            that we can just return immediately and we will reenter
1893            recovery shortly again.
1894            I.e. we deliberately leave the cluster with an inconsistent
1895            generation id to allow us to abort recovery at any stage and
1896            just restart it from scratch.
1897          */
1898         vnnmap->generation = generation;
1899         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1900         if (ret != 0) {
1901                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1902                 return -1;
1903         }
1904
1905         /* Database generations are updated when the transaction is commited to
1906          * the databases.  So make sure to use the final generation as the
1907          * transaction id
1908          */
1909         generation = new_generation();
1910
1911         data.dptr = (void *)&generation;
1912         data.dsize = sizeof(uint32_t);
1913
1914         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1915         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1916                                         nodes, 0,
1917                                         CONTROL_TIMEOUT(), false, data,
1918                                         NULL,
1919                                         transaction_start_fail_callback,
1920                                         rec) != 0) {
1921                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1922                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1923                                         nodes, 0,
1924                                         CONTROL_TIMEOUT(), false, tdb_null,
1925                                         NULL,
1926                                         NULL,
1927                                         NULL) != 0) {
1928                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1929                 }
1930                 return -1;
1931         }
1932
1933         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1934
1935         for (i=0;i<dbmap->num;i++) {
1936                 ret = recover_database(rec, mem_ctx,
1937                                        dbmap->dbs[i].db_id,
1938                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1939                                        pnn, nodemap, generation);
1940                 if (ret != 0) {
1941                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
1942                         return -1;
1943                 }
1944         }
1945
1946         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1947
1948         /* commit all the changes */
1949         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1950                                         nodes, 0,
1951                                         CONTROL_TIMEOUT(), false, data,
1952                                         NULL, NULL,
1953                                         NULL) != 0) {
1954                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1955                 return -1;
1956         }
1957
1958         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1959
1960         /* build a new vnn map with all the currently active and
1961            unbanned nodes */
1962         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1963         CTDB_NO_MEMORY(ctdb, vnnmap);
1964         vnnmap->generation = generation;
1965         vnnmap->size = 0;
1966         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1967         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1968         for (i=j=0;i<nodemap->num;i++) {
1969                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1970                         continue;
1971                 }
1972                 if (!ctdb_node_has_capabilities(rec->caps,
1973                                                 ctdb->nodes[i]->pnn,
1974                                                 CTDB_CAP_LMASTER)) {
1975                         /* this node can not be an lmaster */
1976                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1977                         continue;
1978                 }
1979
1980                 vnnmap->size++;
1981                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1982                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1983                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1984
1985         }
1986         if (vnnmap->size == 0) {
1987                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1988                 vnnmap->size++;
1989                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1990                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1991                 vnnmap->map[0] = pnn;
1992         }
1993
1994         /* update to the new vnnmap on all nodes */
1995         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1996         if (ret != 0) {
1997                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1998                 return -1;
1999         }
2000
2001         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2002
2003         /* update recmaster to point to us for all nodes */
2004         ret = set_recovery_master(ctdb, nodemap, pnn);
2005         if (ret!=0) {
2006                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2007                 return -1;
2008         }
2009
2010         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2011
2012         /* disable recovery mode */
2013         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
2014         if (ret != 0) {
2015                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2016                 return -1;
2017         }
2018
2019         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2020
2021         return 0;
2022 }
2023
2024 /*
2025   we are the recmaster, and recovery is needed - start a recovery run
2026  */
2027 static int do_recovery(struct ctdb_recoverd *rec,
2028                        TALLOC_CTX *mem_ctx, uint32_t pnn,
2029                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
2030 {
2031         struct ctdb_context *ctdb = rec->ctdb;
2032         int i, ret;
2033         struct ctdb_dbid_map_old *dbmap;
2034         struct timeval start_time;
2035         bool self_ban;
2036         bool par_recovery;
2037
2038         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2039
2040         /* Check if the current node is still the recmaster.  It's possible that
2041          * re-election has changed the recmaster, but we have not yet updated
2042          * that information.
2043          */
2044         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2045                                      pnn, &ctdb->recovery_master);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster\n"));
2048                 return -1;
2049         }
2050
2051         if (pnn != ctdb->recovery_master) {
2052                 DEBUG(DEBUG_NOTICE,
2053                       ("Recovery master changed to %u, aborting recovery\n",
2054                        ctdb->recovery_master));
2055                 return -1;
2056         }
2057
2058         /* if recovery fails, force it again */
2059         rec->need_recovery = true;
2060
2061         if (!ctdb_op_begin(rec->recovery)) {
2062                 return -1;
2063         }
2064
2065         if (rec->election_timeout) {
2066                 /* an election is in progress */
2067                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2068                 goto fail;
2069         }
2070
2071         ban_misbehaving_nodes(rec, &self_ban);
2072         if (self_ban) {
2073                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2074                 goto fail;
2075         }
2076
2077         if (ctdb->recovery_lock_file != NULL) {
2078                 if (ctdb_recovery_have_lock(ctdb)) {
2079                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2080                 } else {
2081                         start_time = timeval_current();
2082                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2083                                              ctdb->recovery_lock_file));
2084                         if (!ctdb_recovery_lock(ctdb)) {
2085                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2086                                         /* If ctdb is trying first recovery, it's
2087                                          * possible that current node does not know
2088                                          * yet who the recmaster is.
2089                                          */
2090                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2091                                                           " - retrying recovery\n"));
2092                                         goto fail;
2093                                 }
2094
2095                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2096                                                  "and ban ourself for %u seconds\n",
2097                                                  ctdb->tunable.recovery_ban_period));
2098                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2099                                 goto fail;
2100                         }
2101                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2102                                                            CONTROL_TIMEOUT(),
2103                                                            timeval_elapsed(&start_time));
2104                         DEBUG(DEBUG_NOTICE,
2105                               ("Recovery lock taken successfully by recovery daemon\n"));
2106                 }
2107         }
2108
2109         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2110
2111         /* get a list of all databases */
2112         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2113         if (ret != 0) {
2114                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2115                 goto fail;
2116         }
2117
2118         /* we do the db creation before we set the recovery mode, so the freeze happens
2119            on all databases we will be dealing with. */
2120
2121         /* verify that we have all the databases any other node has */
2122         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2125                 goto fail;
2126         }
2127
2128         /* verify that all other nodes have all our databases */
2129         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2130         if (ret != 0) {
2131                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2132                 goto fail;
2133         }
2134         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2135
2136         /* update the database priority for all remote databases */
2137         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2138         if (ret != 0) {
2139                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2140         }
2141         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2142
2143
2144         /* update all other nodes to use the same setting for reclock files
2145            as the local recovery master.
2146         */
2147         sync_recovery_lock_file_across_cluster(rec);
2148
2149         /* update the capabilities for all nodes */
2150         ret = update_capabilities(rec, nodemap);
2151         if (ret!=0) {
2152                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2153                 return -1;
2154         }
2155
2156         /*
2157           update all nodes to have the same flags that we have
2158          */
2159         for (i=0;i<nodemap->num;i++) {
2160                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2161                         continue;
2162                 }
2163
2164                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2165                 if (ret != 0) {
2166                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2167                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2168                         } else {
2169                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2170                                 return -1;
2171                         }
2172                 }
2173         }
2174
2175         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2176
2177         /* Check if all participating nodes have parallel recovery capability */
2178         par_recovery = true;
2179         for (i=0; i<nodemap->num; i++) {
2180                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2181                         continue;
2182                 }
2183
2184                 if (!(rec->caps[i].capabilities &
2185                       CTDB_CAP_PARALLEL_RECOVERY)) {
2186                         par_recovery = false;
2187                         break;
2188                 }
2189         }
2190
2191         if (par_recovery) {
2192                 ret = db_recovery_parallel(rec, mem_ctx);
2193         } else {
2194                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2195                                          dbmap);
2196         }
2197
2198         if (ret != 0) {
2199                 goto fail;
2200         }
2201
2202         /* Fetch known/available public IPs from each active node */
2203         ret = ctdb_reload_remote_public_ips(ctdb, nodemap);
2204         if (ret != 0) {
2205                 rec->need_takeover_run = true;
2206                 goto fail;
2207         }
2208
2209         do_takeover_run(rec, nodemap, false);
2210
2211         /* execute the "recovered" event script on all nodes */
2212         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2213         if (ret!=0) {
2214                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2215                 goto fail;
2216         }
2217
2218         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2219
2220         /* send a message to all clients telling them that the cluster 
2221            has been reconfigured */
2222         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2223                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2224         if (ret != 0) {
2225                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2226                 goto fail;
2227         }
2228
2229         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2230
2231         rec->need_recovery = false;
2232         ctdb_op_end(rec->recovery);
2233
2234         /* we managed to complete a full recovery, make sure to forgive
2235            any past sins by the nodes that could now participate in the
2236            recovery.
2237         */
2238         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2239         for (i=0;i<nodemap->num;i++) {
2240                 struct ctdb_banning_state *ban_state;
2241
2242                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2243                         continue;
2244                 }
2245
2246                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2247                 if (ban_state == NULL) {
2248                         continue;
2249                 }
2250
2251                 ban_state->count = 0;
2252         }
2253
2254         /* We just finished a recovery successfully.
2255            We now wait for rerecovery_timeout before we allow
2256            another recovery to take place.
2257         */
2258         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2259         ctdb_op_disable(rec->recovery, ctdb->ev,
2260                         ctdb->tunable.rerecovery_timeout);
2261         return 0;
2262
2263 fail:
2264         ctdb_op_end(rec->recovery);
2265         return -1;
2266 }
2267
2268
2269 /*
2270   elections are won by first checking the number of connected nodes, then
2271   the priority time, then the pnn
2272  */
2273 struct election_message {
2274         uint32_t num_connected;
2275         struct timeval priority_time;
2276         uint32_t pnn;
2277         uint32_t node_flags;
2278 };
2279
2280 /*
2281   form this nodes election data
2282  */
2283 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2284 {
2285         int ret, i;
2286         struct ctdb_node_map_old *nodemap;
2287         struct ctdb_context *ctdb = rec->ctdb;
2288
2289         ZERO_STRUCTP(em);
2290
2291         em->pnn = rec->ctdb->pnn;
2292         em->priority_time = rec->priority_time;
2293
2294         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2295         if (ret != 0) {
2296                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2297                 return;
2298         }
2299
2300         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2301         em->node_flags = rec->node_flags;
2302
2303         for (i=0;i<nodemap->num;i++) {
2304                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2305                         em->num_connected++;
2306                 }
2307         }
2308
2309         /* we shouldnt try to win this election if we cant be a recmaster */
2310         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2311                 em->num_connected = 0;
2312                 em->priority_time = timeval_current();
2313         }
2314
2315         talloc_free(nodemap);
2316 }
2317
2318 /*
2319   see if the given election data wins
2320  */
2321 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2322 {
2323         struct election_message myem;
2324         int cmp = 0;
2325
2326         ctdb_election_data(rec, &myem);
2327
2328         /* we cant win if we don't have the recmaster capability */
2329         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2330                 return false;
2331         }
2332
2333         /* we cant win if we are banned */
2334         if (rec->node_flags & NODE_FLAGS_BANNED) {
2335                 return false;
2336         }
2337
2338         /* we cant win if we are stopped */
2339         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2340                 return false;
2341         }
2342
2343         /* we will automatically win if the other node is banned */
2344         if (em->node_flags & NODE_FLAGS_BANNED) {
2345                 return true;
2346         }
2347
2348         /* we will automatically win if the other node is banned */
2349         if (em->node_flags & NODE_FLAGS_STOPPED) {
2350                 return true;
2351         }
2352
2353         /* then the longest running node */
2354         if (cmp == 0) {
2355                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2356         }
2357
2358         if (cmp == 0) {
2359                 cmp = (int)myem.pnn - (int)em->pnn;
2360         }
2361
2362         return cmp > 0;
2363 }
2364
2365 /*
2366   send out an election request
2367  */
2368 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2369 {
2370         int ret;
2371         TDB_DATA election_data;
2372         struct election_message emsg;
2373         uint64_t srvid;
2374         struct ctdb_context *ctdb = rec->ctdb;
2375
2376         srvid = CTDB_SRVID_ELECTION;
2377
2378         ctdb_election_data(rec, &emsg);
2379
2380         election_data.dsize = sizeof(struct election_message);
2381         election_data.dptr  = (unsigned char *)&emsg;
2382
2383
2384         /* first we assume we will win the election and set 
2385            recoverymaster to be ourself on the current node
2386          */
2387         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2388         if (ret != 0) {
2389                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2390                 return -1;
2391         }
2392
2393
2394         /* send an election message to all active nodes */
2395         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2396         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2397 }
2398
2399 /*
2400   this function will unban all nodes in the cluster
2401 */
2402 static void unban_all_nodes(struct ctdb_context *ctdb)
2403 {
2404         int ret, i;
2405         struct ctdb_node_map_old *nodemap;
2406         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2407         
2408         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2409         if (ret != 0) {
2410                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2411                 return;
2412         }
2413
2414         for (i=0;i<nodemap->num;i++) {
2415                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2416                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2417                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2418                                                  nodemap->nodes[i].pnn, 0,
2419                                                  NODE_FLAGS_BANNED);
2420                         if (ret != 0) {
2421                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2422                         }
2423                 }
2424         }
2425
2426         talloc_free(tmp_ctx);
2427 }
2428
2429
2430 /*
2431   we think we are winning the election - send a broadcast election request
2432  */
2433 static void election_send_request(struct tevent_context *ev,
2434                                   struct tevent_timer *te,
2435                                   struct timeval t, void *p)
2436 {
2437         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2438         int ret;
2439
2440         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2441         if (ret != 0) {
2442                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2443         }
2444
2445         TALLOC_FREE(rec->send_election_te);
2446 }
2447
2448 /*
2449   handler for memory dumps
2450 */
2451 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2452 {
2453         struct ctdb_recoverd *rec = talloc_get_type(
2454                 private_data, struct ctdb_recoverd);
2455         struct ctdb_context *ctdb = rec->ctdb;
2456         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2457         TDB_DATA *dump;
2458         int ret;
2459         struct ctdb_srvid_message *rd;
2460
2461         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2462                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2463                 talloc_free(tmp_ctx);
2464                 return;
2465         }
2466         rd = (struct ctdb_srvid_message *)data.dptr;
2467
2468         dump = talloc_zero(tmp_ctx, TDB_DATA);
2469         if (dump == NULL) {
2470                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2471                 talloc_free(tmp_ctx);
2472                 return;
2473         }
2474         ret = ctdb_dump_memory(ctdb, dump);
2475         if (ret != 0) {
2476                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2477                 talloc_free(tmp_ctx);
2478                 return;
2479         }
2480
2481 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2482
2483         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2484         if (ret != 0) {
2485                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2486                 talloc_free(tmp_ctx);
2487                 return;
2488         }
2489
2490         talloc_free(tmp_ctx);
2491 }
2492
2493 /*
2494   handler for reload_nodes
2495 */
2496 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2497                                  void *private_data)
2498 {
2499         struct ctdb_recoverd *rec = talloc_get_type(
2500                 private_data, struct ctdb_recoverd);
2501
2502         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2503
2504         ctdb_load_nodes_file(rec->ctdb);
2505 }
2506
2507
2508 static void ctdb_rebalance_timeout(struct tevent_context *ev,
2509                                    struct tevent_timer *te,
2510                                    struct timeval t, void *p)
2511 {
2512         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2513
2514         if (rec->force_rebalance_nodes == NULL) {
2515                 DEBUG(DEBUG_ERR,
2516                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2517                 return;
2518         }
2519
2520         DEBUG(DEBUG_NOTICE,
2521               ("Rebalance timeout occurred - trigger takeover run\n"));
2522         rec->need_takeover_run = true;
2523 }
2524
2525
2526 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2527                                         void *private_data)
2528 {
2529         struct ctdb_recoverd *rec = talloc_get_type(
2530                 private_data, struct ctdb_recoverd);
2531         struct ctdb_context *ctdb = rec->ctdb;
2532         uint32_t pnn;
2533         uint32_t *t;
2534         int len;
2535         uint32_t deferred_rebalance;
2536
2537         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2538                 return;
2539         }
2540
2541         if (data.dsize != sizeof(uint32_t)) {
2542                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2543                 return;
2544         }
2545
2546         pnn = *(uint32_t *)&data.dptr[0];
2547
2548         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2549
2550         /* Copy any existing list of nodes.  There's probably some
2551          * sort of realloc variant that will do this but we need to
2552          * make sure that freeing the old array also cancels the timer
2553          * event for the timeout... not sure if realloc will do that.
2554          */
2555         len = (rec->force_rebalance_nodes != NULL) ?
2556                 talloc_array_length(rec->force_rebalance_nodes) :
2557                 0;
2558
2559         /* This allows duplicates to be added but they don't cause
2560          * harm.  A call to add a duplicate PNN arguably means that
2561          * the timeout should be reset, so this is the simplest
2562          * solution.
2563          */
2564         t = talloc_zero_array(rec, uint32_t, len+1);
2565         CTDB_NO_MEMORY_VOID(ctdb, t);
2566         if (len > 0) {
2567                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2568         }
2569         t[len] = pnn;
2570
2571         talloc_free(rec->force_rebalance_nodes);
2572
2573         rec->force_rebalance_nodes = t;
2574
2575         /* If configured, setup a deferred takeover run to make sure
2576          * that certain nodes get IPs rebalanced to them.  This will
2577          * be cancelled if a successful takeover run happens before
2578          * the timeout.  Assign tunable value to variable for
2579          * readability.
2580          */
2581         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2582         if (deferred_rebalance != 0) {
2583                 tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
2584                                  timeval_current_ofs(deferred_rebalance, 0),
2585                                  ctdb_rebalance_timeout, rec);
2586         }
2587 }
2588
2589
2590
2591 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2592                                    void *private_data)
2593 {
2594         struct ctdb_recoverd *rec = talloc_get_type(
2595                 private_data, struct ctdb_recoverd);
2596         struct ctdb_public_ip *ip;
2597
2598         if (rec->recmaster != rec->ctdb->pnn) {
2599                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2600                 return;
2601         }
2602
2603         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2604                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2605                 return;
2606         }
2607
2608         ip = (struct ctdb_public_ip *)data.dptr;
2609
2610         update_ip_assignment_tree(rec->ctdb, ip);
2611 }
2612
2613 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2614                                     TDB_DATA data,
2615                                     struct ctdb_op_state *op_state)
2616 {
2617         struct ctdb_disable_message *r;
2618         uint32_t timeout;
2619         TDB_DATA result;
2620         int32_t ret = 0;
2621
2622         /* Validate input data */
2623         if (data.dsize != sizeof(struct ctdb_disable_message)) {
2624                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2625                                  "expecting %lu\n", (long unsigned)data.dsize,
2626                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
2627                 return;
2628         }
2629         if (data.dptr == NULL) {
2630                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2631                 return;
2632         }
2633
2634         r = (struct ctdb_disable_message *)data.dptr;
2635         timeout = r->timeout;
2636
2637         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2638         if (ret != 0) {
2639                 goto done;
2640         }
2641
2642         /* Returning our PNN tells the caller that we succeeded */
2643         ret = ctdb_get_pnn(ctdb);
2644 done:
2645         result.dsize = sizeof(int32_t);
2646         result.dptr  = (uint8_t *)&ret;
2647         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
2648 }
2649
2650 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2651                                           void *private_data)
2652 {
2653         struct ctdb_recoverd *rec = talloc_get_type(
2654                 private_data, struct ctdb_recoverd);
2655
2656         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2657 }
2658
2659 /* Backward compatibility for this SRVID */
2660 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2661                                      void *private_data)
2662 {
2663         struct ctdb_recoverd *rec = talloc_get_type(
2664                 private_data, struct ctdb_recoverd);
2665         uint32_t timeout;
2666
2667         if (data.dsize != sizeof(uint32_t)) {
2668                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2669                                  "expecting %lu\n", (long unsigned)data.dsize,
2670                                  (long unsigned)sizeof(uint32_t)));
2671                 return;
2672         }
2673         if (data.dptr == NULL) {
2674                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2675                 return;
2676         }
2677
2678         timeout = *((uint32_t *)data.dptr);
2679
2680         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2681 }
2682
2683 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2684                                        void *private_data)
2685 {
2686         struct ctdb_recoverd *rec = talloc_get_type(
2687                 private_data, struct ctdb_recoverd);
2688
2689         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2690 }
2691
2692 /*
2693   handler for ip reallocate, just add it to the list of requests and 
2694   handle this later in the monitor_cluster loop so we do not recurse
2695   with other requests to takeover_run()
2696 */
2697 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2698                                   void *private_data)
2699 {
2700         struct ctdb_srvid_message *request;
2701         struct ctdb_recoverd *rec = talloc_get_type(
2702                 private_data, struct ctdb_recoverd);
2703
2704         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2705                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2706                 return;
2707         }
2708
2709         request = (struct ctdb_srvid_message *)data.dptr;
2710
2711         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2712 }
2713
2714 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2715                                           struct ctdb_recoverd *rec)
2716 {
2717         TDB_DATA result;
2718         int32_t ret;
2719         struct srvid_requests *current;
2720
2721         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2722
2723         /* Only process requests that are currently pending.  More
2724          * might come in while the takeover run is in progress and
2725          * they will need to be processed later since they might
2726          * be in response flag changes.
2727          */
2728         current = rec->reallocate_requests;
2729         rec->reallocate_requests = NULL;
2730
2731         /* update the list of public ips that a node can handle for
2732            all connected nodes
2733         */
2734         ret = ctdb_reload_remote_public_ips(ctdb, rec->nodemap);
2735         if (ret != 0) {
2736                 rec->need_takeover_run = true;
2737         }
2738         if (ret == 0) {
2739                 if (do_takeover_run(rec, rec->nodemap, false)) {
2740                         ret = ctdb_get_pnn(ctdb);
2741                 } else {
2742                         ret = -1;
2743                 }
2744         }
2745
2746         result.dsize = sizeof(int32_t);
2747         result.dptr  = (uint8_t *)&ret;
2748
2749         srvid_requests_reply(ctdb, &current, result);
2750 }
2751
2752
2753 /*
2754   handler for recovery master elections
2755 */
2756 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2757 {
2758         struct ctdb_recoverd *rec = talloc_get_type(
2759                 private_data, struct ctdb_recoverd);
2760         struct ctdb_context *ctdb = rec->ctdb;
2761         int ret;
2762         struct election_message *em = (struct election_message *)data.dptr;
2763
2764         /* Ignore election packets from ourself */
2765         if (ctdb->pnn == em->pnn) {
2766                 return;
2767         }
2768
2769         /* we got an election packet - update the timeout for the election */
2770         talloc_free(rec->election_timeout);
2771         rec->election_timeout = tevent_add_timer(
2772                         ctdb->ev, ctdb,
2773                         fast_start ?
2774                                 timeval_current_ofs(0, 500000) :
2775                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2776                         ctdb_election_timeout, rec);
2777
2778         /* someone called an election. check their election data
2779            and if we disagree and we would rather be the elected node, 
2780            send a new election message to all other nodes
2781          */
2782         if (ctdb_election_win(rec, em)) {
2783                 if (!rec->send_election_te) {
2784                         rec->send_election_te = tevent_add_timer(
2785                                         ctdb->ev, rec,
2786                                         timeval_current_ofs(0, 500000),
2787                                         election_send_request, rec);
2788                 }
2789                 /*unban_all_nodes(ctdb);*/
2790                 return;
2791         }
2792
2793         /* we didn't win */
2794         TALLOC_FREE(rec->send_election_te);
2795
2796         /* Release the recovery lock file */
2797         if (ctdb_recovery_have_lock(ctdb)) {
2798                 ctdb_recovery_unlock(ctdb);
2799                 unban_all_nodes(ctdb);
2800         }
2801
2802         clear_ip_assignment_tree(ctdb);
2803
2804         /* ok, let that guy become recmaster then */
2805         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2806         if (ret != 0) {
2807                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2808                 return;
2809         }
2810
2811         return;
2812 }
2813
2814
2815 /*
2816   force the start of the election process
2817  */
2818 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2819                            struct ctdb_node_map_old *nodemap)
2820 {
2821         int ret;
2822         struct ctdb_context *ctdb = rec->ctdb;
2823
2824         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2825
2826         /* set all nodes to recovery mode to stop all internode traffic */
2827         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2828         if (ret != 0) {
2829                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2830                 return;
2831         }
2832
2833         talloc_free(rec->election_timeout);
2834         rec->election_timeout = tevent_add_timer(
2835                         ctdb->ev, ctdb,
2836                         fast_start ?
2837                                 timeval_current_ofs(0, 500000) :
2838                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2839                         ctdb_election_timeout, rec);
2840
2841         ret = send_election_request(rec, pnn);
2842         if (ret!=0) {
2843                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2844                 return;
2845         }
2846
2847         /* wait for a few seconds to collect all responses */
2848         ctdb_wait_election(rec);
2849 }
2850
2851
2852
2853 /*
2854   handler for when a node changes its flags
2855 */
2856 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2857 {
2858         struct ctdb_recoverd *rec = talloc_get_type(
2859                 private_data, struct ctdb_recoverd);
2860         struct ctdb_context *ctdb = rec->ctdb;
2861         int ret;
2862         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2863         struct ctdb_node_map_old *nodemap=NULL;
2864         TALLOC_CTX *tmp_ctx;
2865         int i;
2866         int disabled_flag_changed;
2867
2868         if (data.dsize != sizeof(*c)) {
2869                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2870                 return;
2871         }
2872
2873         tmp_ctx = talloc_new(ctdb);
2874         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2875
2876         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2877         if (ret != 0) {
2878                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2879                 talloc_free(tmp_ctx);
2880                 return;         
2881         }
2882
2883
2884         for (i=0;i<nodemap->num;i++) {
2885                 if (nodemap->nodes[i].pnn == c->pnn) break;
2886         }
2887
2888         if (i == nodemap->num) {
2889                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2890                 talloc_free(tmp_ctx);
2891                 return;
2892         }
2893
2894         if (c->old_flags != c->new_flags) {
2895                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2896         }
2897
2898         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2899
2900         nodemap->nodes[i].flags = c->new_flags;
2901
2902         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2903                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2904
2905         if (ret == 0) {
2906                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2907                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2908         }
2909         
2910         if (ret == 0 &&
2911             ctdb->recovery_master == ctdb->pnn &&
2912             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2913                 /* Only do the takeover run if the perm disabled or unhealthy
2914                    flags changed since these will cause an ip failover but not
2915                    a recovery.
2916                    If the node became disconnected or banned this will also
2917                    lead to an ip address failover but that is handled 
2918                    during recovery
2919                 */
2920                 if (disabled_flag_changed) {
2921                         rec->need_takeover_run = true;
2922                 }
2923         }
2924
2925         talloc_free(tmp_ctx);
2926 }
2927
2928 /*
2929   handler for when we need to push out flag changes ot all other nodes
2930 */
2931 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2932                                void *private_data)
2933 {
2934         struct ctdb_recoverd *rec = talloc_get_type(
2935                 private_data, struct ctdb_recoverd);
2936         struct ctdb_context *ctdb = rec->ctdb;
2937         int ret;
2938         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2939         struct ctdb_node_map_old *nodemap=NULL;
2940         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2941         uint32_t recmaster;
2942         uint32_t *nodes;
2943
2944         /* find the recovery master */
2945         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2946         if (ret != 0) {
2947                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2948                 talloc_free(tmp_ctx);
2949                 return;
2950         }
2951
2952         /* read the node flags from the recmaster */
2953         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2954         if (ret != 0) {
2955                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2956                 talloc_free(tmp_ctx);
2957                 return;
2958         }
2959         if (c->pnn >= nodemap->num) {
2960                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2961                 talloc_free(tmp_ctx);
2962                 return;
2963         }
2964
2965         /* send the flags update to all connected nodes */
2966         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2967
2968         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2969                                       nodes, 0, CONTROL_TIMEOUT(),
2970                                       false, data,
2971                                       NULL, NULL,
2972                                       NULL) != 0) {
2973                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2974
2975                 talloc_free(tmp_ctx);
2976                 return;
2977         }
2978
2979         talloc_free(tmp_ctx);
2980 }
2981
2982
2983 struct verify_recmode_normal_data {
2984         uint32_t count;
2985         enum monitor_result status;
2986 };
2987
2988 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2989 {
2990         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2991
2992
2993         /* one more node has responded with recmode data*/
2994         rmdata->count--;
2995
2996         /* if we failed to get the recmode, then return an error and let
2997            the main loop try again.
2998         */
2999         if (state->state != CTDB_CONTROL_DONE) {
3000                 if (rmdata->status == MONITOR_OK) {
3001                         rmdata->status = MONITOR_FAILED;
3002                 }
3003                 return;
3004         }
3005
3006         /* if we got a response, then the recmode will be stored in the
3007            status field
3008         */
3009         if (state->status != CTDB_RECOVERY_NORMAL) {
3010                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
3011                 rmdata->status = MONITOR_RECOVERY_NEEDED;
3012         }
3013
3014         return;
3015 }
3016
3017
3018 /* verify that all nodes are in normal recovery mode */
3019 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
3020 {
3021         struct verify_recmode_normal_data *rmdata;
3022         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3023         struct ctdb_client_control_state *state;
3024         enum monitor_result status;
3025         int j;
3026         
3027         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
3028         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3029         rmdata->count  = 0;
3030         rmdata->status = MONITOR_OK;
3031
3032         /* loop over all active nodes and send an async getrecmode call to 
3033            them*/
3034         for (j=0; j<nodemap->num; j++) {
3035                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3036                         continue;
3037                 }
3038                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
3039                                         CONTROL_TIMEOUT(), 
3040                                         nodemap->nodes[j].pnn);
3041                 if (state == NULL) {
3042                         /* we failed to send the control, treat this as 
3043                            an error and try again next iteration
3044                         */                      
3045                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
3046                         talloc_free(mem_ctx);
3047                         return MONITOR_FAILED;
3048                 }
3049
3050                 /* set up the callback functions */
3051                 state->async.fn = verify_recmode_normal_callback;
3052                 state->async.private_data = rmdata;
3053
3054                 /* one more control to wait for to complete */
3055                 rmdata->count++;
3056         }
3057
3058
3059         /* now wait for up to the maximum number of seconds allowed
3060            or until all nodes we expect a response from has replied
3061         */
3062         while (rmdata->count > 0) {
3063                 tevent_loop_once(ctdb->ev);
3064         }
3065
3066         status = rmdata->status;
3067         talloc_free(mem_ctx);
3068         return status;
3069 }
3070
3071
3072 struct verify_recmaster_data {
3073         struct ctdb_recoverd *rec;
3074         uint32_t count;
3075         uint32_t pnn;
3076         enum monitor_result status;
3077 };
3078
3079 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3080 {
3081         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3082
3083
3084         /* one more node has responded with recmaster data*/
3085         rmdata->count--;
3086
3087         /* if we failed to get the recmaster, then return an error and let
3088            the main loop try again.
3089         */
3090         if (state->state != CTDB_CONTROL_DONE) {
3091                 if (rmdata->status == MONITOR_OK) {
3092                         rmdata->status = MONITOR_FAILED;
3093                 }
3094                 return;
3095         }
3096
3097         /* if we got a response, then the recmaster will be stored in the
3098            status field
3099         */
3100         if (state->status != rmdata->pnn) {
3101                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3102                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3103                 rmdata->status = MONITOR_ELECTION_NEEDED;
3104         }
3105
3106         return;
3107 }
3108
3109
3110 /* verify that all nodes agree that we are the recmaster */
3111 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3112 {
3113         struct ctdb_context *ctdb = rec->ctdb;
3114         struct verify_recmaster_data *rmdata;
3115         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3116         struct ctdb_client_control_state *state;
3117         enum monitor_result status;
3118         int j;
3119         
3120         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3121         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3122         rmdata->rec    = rec;
3123         rmdata->count  = 0;
3124         rmdata->pnn    = pnn;
3125         rmdata->status = MONITOR_OK;
3126
3127         /* loop over all active nodes and send an async getrecmaster call to 
3128            them*/
3129         for (j=0; j<nodemap->num; j++) {
3130                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3131                         continue;
3132                 }
3133                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3134                                         CONTROL_TIMEOUT(),
3135                                         nodemap->nodes[j].pnn);
3136                 if (state == NULL) {
3137                         /* we failed to send the control, treat this as 
3138                            an error and try again next iteration
3139                         */                      
3140                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3141                         talloc_free(mem_ctx);
3142                         return MONITOR_FAILED;
3143                 }
3144
3145                 /* set up the callback functions */
3146                 state->async.fn = verify_recmaster_callback;
3147                 state->async.private_data = rmdata;
3148
3149                 /* one more control to wait for to complete */
3150                 rmdata->count++;
3151         }
3152
3153
3154         /* now wait for up to the maximum number of seconds allowed
3155            or until all nodes we expect a response from has replied
3156         */
3157         while (rmdata->count > 0) {
3158                 tevent_loop_once(ctdb->ev);
3159         }
3160
3161         status = rmdata->status;
3162         talloc_free(mem_ctx);
3163         return status;
3164 }
3165
3166 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3167                                     struct ctdb_recoverd *rec)
3168 {
3169         struct ctdb_iface_list_old *ifaces = NULL;
3170         TALLOC_CTX *mem_ctx;
3171         bool ret = false;
3172
3173         mem_ctx = talloc_new(NULL);
3174
3175         /* Read the interfaces from the local node */
3176         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3177                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3178                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3179                 /* We could return an error.  However, this will be
3180                  * rare so we'll decide that the interfaces have
3181                  * actually changed, just in case.
3182                  */
3183                 talloc_free(mem_ctx);
3184                 return true;
3185         }
3186
3187         if (!rec->ifaces) {
3188                 /* We haven't been here before so things have changed */
3189                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3190                 ret = true;
3191         } else if (rec->ifaces->num != ifaces->num) {
3192                 /* Number of interfaces has changed */
3193                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3194                                      rec->ifaces->num, ifaces->num));
3195                 ret = true;
3196         } else {
3197                 /* See if interface names or link states have changed */
3198                 int i;
3199                 for (i = 0; i < rec->ifaces->num; i++) {
3200                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
3201                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3202                                 DEBUG(DEBUG_NOTICE,
3203                                       ("Interface in slot %d changed: %s => %s\n",
3204                                        i, iface->name, ifaces->ifaces[i].name));
3205                                 ret = true;
3206                                 break;
3207                         }
3208                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3209                                 DEBUG(DEBUG_NOTICE,
3210                                       ("Interface %s changed state: %d => %d\n",
3211                                        iface->name, iface->link_state,
3212                                        ifaces->ifaces[i].link_state));
3213                                 ret = true;
3214                                 break;
3215                         }
3216                 }
3217         }
3218
3219         talloc_free(rec->ifaces);
3220         rec->ifaces = talloc_steal(rec, ifaces);
3221
3222         talloc_free(mem_ctx);
3223         return ret;
3224 }
3225
3226 /* called to check that the local allocation of public ip addresses is ok.
3227 */
3228 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
3229 {
3230         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3231         int ret, j;
3232         bool need_takeover_run = false;
3233
3234         if (interfaces_have_changed(ctdb, rec)) {
3235                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3236                                      "local node %u - force takeover run\n",
3237                                      pnn));
3238                 need_takeover_run = true;
3239         }
3240
3241         /* verify that we have the ip addresses we should have
3242            and we don't have ones we shouldnt have.
3243            if we find an inconsistency we set recmode to
3244            active on the local node and wait for the recmaster
3245            to do a full blown recovery.
3246            also if the pnn is -1 and we are healthy and can host the ip
3247            we also request a ip reallocation.
3248         */
3249         if (ctdb->tunable.disable_ip_failover == 0) {
3250                 struct ctdb_public_ip_list_old *ips = NULL;
3251
3252                 /* read the *available* IPs from the local node */
3253                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3254                 if (ret != 0) {
3255                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3256                         talloc_free(mem_ctx);
3257                         return -1;
3258                 }
3259
3260                 for (j=0; j<ips->num; j++) {
3261                         if (ips->ips[j].pnn == -1 &&
3262                             nodemap->nodes[pnn].flags == 0) {
3263                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3264                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3265                                 need_takeover_run = true;
3266                         }
3267                 }
3268
3269                 talloc_free(ips);
3270
3271                 /* read the *known* IPs from the local node */
3272                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3273                 if (ret != 0) {
3274                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3275                         talloc_free(mem_ctx);
3276                         return -1;
3277                 }
3278
3279                 for (j=0; j<ips->num; j++) {
3280                         if (ips->ips[j].pnn == pnn) {
3281                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3282                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3283                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3284                                         need_takeover_run = true;
3285                                 }
3286                         } else {
3287                                 if (ctdb->do_checkpublicip &&
3288                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3289
3290                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3291                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3292
3293                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3294                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3295                                         }
3296                                 }
3297                         }
3298                 }
3299         }
3300
3301         if (need_takeover_run) {
3302                 struct ctdb_srvid_message rd;
3303                 TDB_DATA data;
3304
3305                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3306
3307                 rd.pnn = ctdb->pnn;
3308                 rd.srvid = 0;
3309                 data.dptr = (uint8_t *)&rd;
3310                 data.dsize = sizeof(rd);
3311
3312                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3313                 if (ret != 0) {
3314                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3315                 }
3316         }
3317         talloc_free(mem_ctx);
3318         return 0;
3319 }
3320
3321
3322 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3323 {
3324         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3325
3326         if (node_pnn >= ctdb->num_nodes) {
3327                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3328                 return;
3329         }
3330
3331         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3332
3333 }
3334
3335 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3336         struct ctdb_node_map_old *nodemap,
3337         struct ctdb_node_map_old **remote_nodemaps)
3338 {
3339         uint32_t *nodes;
3340
3341         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3342         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3343                                         nodes, 0,
3344                                         CONTROL_TIMEOUT(), false, tdb_null,
3345                                         async_getnodemap_callback,
3346                                         NULL,
3347                                         remote_nodemaps) != 0) {
3348                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3349
3350                 return -1;
3351         }
3352
3353         return 0;
3354 }
3355
3356 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3357 {
3358         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3359         const char *reclockfile;
3360
3361         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3362                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3363                 talloc_free(tmp_ctx);
3364                 return -1;      
3365         }
3366
3367         if (reclockfile == NULL) {
3368                 if (ctdb->recovery_lock_file != NULL) {
3369                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3370                         talloc_free(ctdb->recovery_lock_file);
3371                         ctdb->recovery_lock_file = NULL;
3372                         ctdb_recovery_unlock(ctdb);
3373                 }
3374                 talloc_free(tmp_ctx);
3375                 return 0;
3376         }
3377
3378         if (ctdb->recovery_lock_file == NULL) {
3379                 DEBUG(DEBUG_NOTICE,
3380                       ("Recovery lock file enabled (%s)\n", reclockfile));
3381                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3382                 ctdb_recovery_unlock(ctdb);
3383                 talloc_free(tmp_ctx);
3384                 return 0;
3385         }
3386
3387
3388         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3389                 talloc_free(tmp_ctx);
3390                 return 0;
3391         }
3392
3393         DEBUG(DEBUG_NOTICE,
3394               ("Recovery lock file changed (now %s)\n", reclockfile));
3395         talloc_free(ctdb->recovery_lock_file);
3396         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3397         ctdb_recovery_unlock(ctdb);
3398
3399         talloc_free(tmp_ctx);
3400         return 0;
3401 }
3402
3403 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3404                       TALLOC_CTX *mem_ctx)
3405 {
3406         uint32_t pnn;
3407         struct ctdb_node_map_old *nodemap=NULL;
3408         struct ctdb_node_map_old *recmaster_nodemap=NULL;
3409         struct ctdb_node_map_old **remote_nodemaps=NULL;
3410         struct ctdb_vnn_map *vnnmap=NULL;
3411         struct ctdb_vnn_map *remote_vnnmap=NULL;
3412         uint32_t num_lmasters;
3413         int32_t debug_level;
3414         int i, j, ret;
3415         bool self_ban;
3416
3417
3418         /* verify that the main daemon is still running */
3419         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3420                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3421                 exit(-1);
3422         }
3423
3424         /* ping the local daemon to tell it we are alive */
3425         ctdb_ctrl_recd_ping(ctdb);
3426
3427         if (rec->election_timeout) {
3428                 /* an election is in progress */
3429                 return;
3430         }
3431
3432         /* read the debug level from the parent and update locally */
3433         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3434         if (ret !=0) {
3435                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3436                 return;
3437         }
3438         DEBUGLEVEL = debug_level;
3439
3440         /* get relevant tunables */
3441         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3442         if (ret != 0) {
3443                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3444                 return;
3445         }
3446
3447         /* get runstate */
3448         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3449                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3450         if (ret != 0) {
3451                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3452                 return;
3453         }
3454
3455         /* get the current recovery lock file from the server */
3456         if (update_recovery_lock_file(ctdb) != 0) {
3457                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3458                 return;
3459         }
3460
3461         /* Make sure that if recovery lock verification becomes disabled when
3462            we close the file
3463         */
3464         if (ctdb->recovery_lock_file == NULL) {
3465                 ctdb_recovery_unlock(ctdb);
3466         }
3467
3468         pnn = ctdb_get_pnn(ctdb);
3469
3470         /* get the vnnmap */
3471         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3472         if (ret != 0) {
3473                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3474                 return;
3475         }
3476
3477
3478         /* get number of nodes */
3479         if (rec->nodemap) {
3480                 talloc_free(rec->nodemap);
3481                 rec->nodemap = NULL;
3482                 nodemap=NULL;
3483         }
3484         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3485         if (ret != 0) {
3486                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3487                 return;
3488         }
3489         nodemap = rec->nodemap;
3490
3491         /* remember our own node flags */
3492         rec->node_flags = nodemap->nodes[pnn].flags;
3493
3494         ban_misbehaving_nodes(rec, &self_ban);
3495         if (self_ban) {
3496                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3497                 return;
3498         }
3499
3500         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3501            also frozen and that the recmode is set to active.
3502         */
3503         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3504                 /* If this node has become inactive then we want to
3505                  * reduce the chances of it taking over the recovery
3506                  * master role when it becomes active again.  This
3507                  * helps to stabilise the recovery master role so that
3508                  * it stays on the most stable node.
3509                  */
3510                 rec->priority_time = timeval_current();
3511
3512                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3513                 if (ret != 0) {
3514                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3515                 }
3516                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3517                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3518
3519                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3520                         if (ret != 0) {
3521                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3522
3523                                 return;
3524                         }
3525                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3526                         if (ret != 0) {
3527                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3528                                 return;
3529                         }
3530                 }
3531
3532                 /* If this node is stopped or banned then it is not the recovery
3533                  * master, so don't do anything. This prevents stopped or banned
3534                  * node from starting election and sending unnecessary controls.
3535                  */
3536                 return;
3537         }
3538
3539         /* check which node is the recovery master */
3540         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3541         if (ret != 0) {
3542                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3543                 return;
3544         }
3545
3546         /* If we are not the recmaster then do some housekeeping */
3547         if (rec->recmaster != pnn) {
3548                 /* Ignore any IP reallocate requests - only recmaster
3549                  * processes them
3550                  */
3551                 TALLOC_FREE(rec->reallocate_requests);
3552                 /* Clear any nodes that should be force rebalanced in
3553                  * the next takeover run.  If the recovery master role
3554                  * has moved then we don't want to process these some
3555                  * time in the future.
3556                  */
3557                 TALLOC_FREE(rec->force_rebalance_nodes);
3558         }
3559
3560         /* This is a special case.  When recovery daemon is started, recmaster
3561          * is set to -1.  If a node is not started in stopped state, then
3562          * start election to decide recovery master
3563          */
3564         if (rec->recmaster == (uint32_t)-1) {
3565                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3566                 force_election(rec, pnn, nodemap);
3567                 return;
3568         }
3569
3570         /* update the capabilities for all nodes */
3571         ret = update_capabilities(rec, nodemap);
3572         if (ret != 0) {
3573                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3574                 return;
3575         }
3576
3577         /*
3578          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3579          * but we have, then force an election and try to become the new
3580          * recmaster.
3581          */
3582         if (!ctdb_node_has_capabilities(rec->caps,
3583                                         rec->recmaster,
3584                                         CTDB_CAP_RECMASTER) &&
3585             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3586             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3587                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3588                                   " but we (node %u) have - force an election\n",
3589                                   rec->recmaster, pnn));
3590                 force_election(rec, pnn, nodemap);
3591                 return;
3592         }
3593
3594         /* verify that the recmaster node is still active */
3595         for (j=0; j<nodemap->num; j++) {
3596                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3597                         break;
3598                 }
3599         }
3600
3601         if (j == nodemap->num) {
3602                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3603                 force_election(rec, pnn, nodemap);
3604                 return;
3605         }
3606
3607         /* if recovery master is disconnected we must elect a new recmaster */
3608         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3609                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3610                 force_election(rec, pnn, nodemap);
3611                 return;
3612         }
3613
3614         /* get nodemap from the recovery master to check if it is inactive */
3615         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3616                                    mem_ctx, &recmaster_nodemap);
3617         if (ret != 0) {
3618                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3619                           nodemap->nodes[j].pnn));
3620                 return;
3621         }
3622
3623
3624         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3625             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3626                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3627                 /*
3628                  * update our nodemap to carry the recmaster's notion of
3629                  * its own flags, so that we don't keep freezing the
3630                  * inactive recmaster node...
3631                  */
3632                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3633                 force_election(rec, pnn, nodemap);
3634                 return;
3635         }
3636
3637         /* verify that we have all ip addresses we should have and we dont
3638          * have addresses we shouldnt have.
3639          */ 
3640         if (ctdb->tunable.disable_ip_failover == 0 &&
3641             !ctdb_op_is_disabled(rec->takeover_run)) {
3642                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3643                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3644                 }
3645         }
3646
3647
3648         /* if we are not the recmaster then we do not need to check
3649            if recovery is needed
3650          */
3651         if (pnn != rec->recmaster) {
3652                 return;
3653         }
3654
3655
3656         /* ensure our local copies of flags are right */
3657         ret = update_local_flags(rec, nodemap);
3658         if (ret == MONITOR_ELECTION_NEEDED) {
3659                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3660                 force_election(rec, pnn, nodemap);
3661                 return;
3662         }
3663         if (ret != MONITOR_OK) {
3664                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3665                 return;
3666         }
3667
3668         if (ctdb->num_nodes != nodemap->num) {
3669                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3670                 ctdb_load_nodes_file(ctdb);
3671                 return;
3672         }
3673
3674         /* verify that all active nodes agree that we are the recmaster */
3675         switch (verify_recmaster(rec, nodemap, pnn)) {
3676         case MONITOR_RECOVERY_NEEDED:
3677                 /* can not happen */
3678                 return;
3679         case MONITOR_ELECTION_NEEDED:
3680                 force_election(rec, pnn, nodemap);
3681                 return;
3682         case MONITOR_OK:
3683                 break;
3684         case MONITOR_FAILED:
3685                 return;
3686         }
3687
3688
3689         if (rec->need_recovery) {
3690                 /* a previous recovery didn't finish */
3691                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3692                 return;
3693         }
3694
3695         /* verify that all active nodes are in normal mode 
3696            and not in recovery mode 
3697         */
3698         switch (verify_recmode(ctdb, nodemap)) {
3699         case MONITOR_RECOVERY_NEEDED:
3700                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3701                 return;
3702         case MONITOR_FAILED:
3703                 return;
3704         case MONITOR_ELECTION_NEEDED:
3705                 /* can not happen */
3706         case MONITOR_OK:
3707                 break;
3708         }
3709
3710
3711         if (ctdb->recovery_lock_file != NULL) {
3712                 /* We must already hold the recovery lock */
3713                 if (!ctdb_recovery_have_lock(ctdb)) {
3714                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3715                         ctdb_set_culprit(rec, ctdb->pnn);
3716                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3717                         return;
3718                 }
3719         }
3720
3721
3722         /* if there are takeovers requested, perform it and notify the waiters */
3723         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3724             rec->reallocate_requests) {
3725                 process_ipreallocate_requests(ctdb, rec);
3726         }
3727
3728         /* If recoveries are disabled then there is no use doing any
3729          * nodemap or flags checks.  Recoveries might be disabled due
3730          * to "reloadnodes", so doing these checks might cause an
3731          * unnecessary recovery.  */
3732         if (ctdb_op_is_disabled(rec->recovery)) {
3733                 return;
3734         }
3735
3736         /* get the nodemap for all active remote nodes
3737          */
3738         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3739         if (remote_nodemaps == NULL) {
3740                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3741                 return;
3742         }
3743         for(i=0; i<nodemap->num; i++) {
3744                 remote_nodemaps[i] = NULL;
3745         }
3746         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3747                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3748                 return;
3749         } 
3750
3751         /* verify that all other nodes have the same nodemap as we have
3752         */
3753         for (j=0; j<nodemap->num; j++) {
3754                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3755                         continue;
3756                 }
3757
3758                 if (remote_nodemaps[j] == NULL) {
3759                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3760                         ctdb_set_culprit(rec, j);
3761
3762                         return;
3763                 }
3764
3765                 /* if the nodes disagree on how many nodes there are
3766                    then this is a good reason to try recovery
3767                  */
3768                 if (remote_nodemaps[j]->num != nodemap->num) {
3769                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3770                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3771                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3772                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3773                         return;
3774                 }
3775
3776                 /* if the nodes disagree on which nodes exist and are
3777                    active, then that is also a good reason to do recovery
3778                  */
3779                 for (i=0;i<nodemap->num;i++) {
3780                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3781                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3782                                           nodemap->nodes[j].pnn, i, 
3783                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3784                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3785                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3786                                             vnnmap);
3787                                 return;
3788                         }
3789                 }
3790         }
3791
3792         /*
3793          * Update node flags obtained from each active node. This ensure we have
3794          * up-to-date information for all the nodes.
3795          */
3796         for (j=0; j<nodemap->num; j++) {
3797                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3798                         continue;
3799                 }
3800                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3801         }
3802
3803         for (j=0; j<nodemap->num; j++) {
3804                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3805                         continue;
3806                 }
3807
3808                 /* verify the flags are consistent
3809                 */
3810                 for (i=0; i<nodemap->num; i++) {
3811                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3812                                 continue;
3813                         }
3814                         
3815                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3816                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3817                                   nodemap->nodes[j].pnn, 
3818                                   nodemap->nodes[i].pnn, 
3819                                   remote_nodemaps[j]->nodes[i].flags,
3820                                   nodemap->nodes[i].flags));
3821                                 if (i == j) {
3822                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3823                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3824                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3825                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3826                                                     vnnmap);
3827                                         return;
3828                                 } else {
3829                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3830                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3831                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3832                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3833                                                     vnnmap);
3834                                         return;
3835                                 }
3836                         }
3837                 }
3838         }
3839
3840
3841         /* count how many active nodes there are */
3842         num_lmasters  = 0;
3843         for (i=0; i<nodemap->num; i++) {
3844                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3845                         if (ctdb_node_has_capabilities(rec->caps,
3846                                                        ctdb->nodes[i]->pnn,
3847                                                        CTDB_CAP_LMASTER)) {
3848                                 num_lmasters++;
3849                         }
3850                 }
3851         }
3852
3853
3854         /* There must be the same number of lmasters in the vnn map as
3855          * there are active nodes with the lmaster capability...  or
3856          * do a recovery.
3857          */
3858         if (vnnmap->size != num_lmasters) {
3859                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3860                           vnnmap->size, num_lmasters));
3861                 ctdb_set_culprit(rec, ctdb->pnn);
3862                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3863                 return;
3864         }
3865
3866         /* verify that all active nodes in the nodemap also exist in 
3867            the vnnmap.
3868          */
3869         for (j=0; j<nodemap->num; j++) {
3870                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3871                         continue;
3872                 }
3873                 if (nodemap->nodes[j].pnn == pnn) {
3874                         continue;
3875                 }
3876
3877                 for (i=0; i<vnnmap->size; i++) {
3878                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3879                                 break;
3880                         }
3881                 }
3882                 if (i == vnnmap->size) {
3883                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3884                                   nodemap->nodes[j].pnn));
3885                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3886                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3887                         return;
3888                 }
3889         }
3890
3891         
3892         /* verify that all other nodes have the same vnnmap
3893            and are from the same generation
3894          */
3895         for (j=0; j<nodemap->num; j++) {
3896                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3897                         continue;
3898                 }
3899                 if (nodemap->nodes[j].pnn == pnn) {
3900                         continue;
3901                 }
3902
3903                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3904                                           mem_ctx, &remote_vnnmap);
3905                 if (ret != 0) {
3906                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3907                                   nodemap->nodes[j].pnn));
3908                         return;
3909                 }
3910
3911                 /* verify the vnnmap generation is the same */
3912                 if (vnnmap->generation != remote_vnnmap->generation) {
3913                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3914                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3915                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3916                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3917                         return;
3918                 }
3919
3920                 /* verify the vnnmap size is the same */
3921                 if (vnnmap->size != remote_vnnmap->size) {
3922                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3923                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3924                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3925                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3926                         return;
3927                 }
3928
3929                 /* verify the vnnmap is the same */
3930                 for (i=0;i<vnnmap->size;i++) {
3931                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3932                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3933                                           nodemap->nodes[j].pnn));
3934                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3935                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3936                                             vnnmap);
3937                                 return;
3938                         }
3939                 }
3940         }
3941
3942         /* we might need to change who has what IP assigned */
3943         if (rec->need_takeover_run) {
3944                 /* update the list of public ips that a node can handle for
3945                    all connected nodes
3946                 */
3947                 ret = ctdb_reload_remote_public_ips(ctdb, nodemap);
3948                 if (ret != 0) {
3949                         return;
3950                 }
3951
3952                 /* If takeover run fails, then the offending nodes are
3953                  * assigned ban culprit counts. And we re-try takeover.
3954                  * If takeover run fails repeatedly, the node would get
3955                  * banned.
3956                  */
3957                 do_takeover_run(rec, nodemap, true);
3958         }
3959 }
3960
3961 /*
3962   the main monitoring loop
3963  */
3964 static void monitor_cluster(struct ctdb_context *ctdb)
3965 {
3966         struct ctdb_recoverd *rec;
3967
3968         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3969
3970         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3971         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3972
3973         rec->ctdb = ctdb;
3974
3975         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3976         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3977
3978         rec->recovery = ctdb_op_init(rec, "recoveries");
3979         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3980
3981         rec->priority_time = timeval_current();
3982
3983         /* register a message port for sending memory dumps */
3984         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3985
3986         /* register a message port for recovery elections */
3987         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3988
3989         /* when nodes are disabled/enabled */
3990         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3991
3992         /* when we are asked to puch out a flag change */
3993         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3994
3995         /* register a message port for vacuum fetch */
3996         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3997
3998         /* register a message port for reloadnodes  */
3999         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4000
4001         /* register a message port for performing a takeover run */
4002         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4003
4004         /* register a message port for disabling the ip check for a short while */
4005         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4006
4007         /* register a message port for updating the recovery daemons node assignment for an ip */
4008         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4009
4010         /* register a message port for forcing a rebalance of a node next
4011            reallocation */
4012         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4013
4014         /* Register a message port for disabling takeover runs */
4015         ctdb_client_set_message_handler(ctdb,
4016                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4017                                         disable_takeover_runs_handler, rec);
4018
4019         /* Register a message port for disabling recoveries */
4020         ctdb_client_set_message_handler(ctdb,
4021                                         CTDB_SRVID_DISABLE_RECOVERIES,
4022                                         disable_recoveries_handler, rec);
4023
4024         /* register a message port for detaching database */
4025         ctdb_client_set_message_handler(ctdb,
4026                                         CTDB_SRVID_DETACH_DATABASE,
4027                                         detach_database_handler, rec);
4028
4029         for (;;) {
4030                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4031                 struct timeval start;
4032                 double elapsed;
4033
4034                 if (!mem_ctx) {
4035                         DEBUG(DEBUG_CRIT,(__location__
4036                                           " Failed to create temp context\n"));
4037                         exit(-1);
4038                 }
4039
4040                 start = timeval_current();
4041                 main_loop(ctdb, rec, mem_ctx);
4042                 talloc_free(mem_ctx);
4043
4044                 /* we only check for recovery once every second */
4045                 elapsed = timeval_elapsed(&start);
4046                 if (elapsed < ctdb->tunable.recover_interval) {
4047                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4048                                           - elapsed);
4049                 }
4050         }
4051 }
4052
4053 /*
4054   event handler for when the main ctdbd dies
4055  */
4056 static void ctdb_recoverd_parent(struct tevent_context *ev,
4057                                  struct tevent_fd *fde,
4058                                  uint16_t flags, void *private_data)
4059 {
4060         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4061         _exit(1);
4062 }
4063
4064 /*
4065   called regularly to verify that the recovery daemon is still running
4066  */
4067 static void ctdb_check_recd(struct tevent_context *ev,
4068                             struct tevent_timer *te,
4069                             struct timeval yt, void *p)
4070 {
4071         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4072
4073         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4074                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4075
4076                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
4077                                  ctdb_restart_recd, ctdb);
4078
4079                 return;
4080         }
4081
4082         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4083                          timeval_current_ofs(30, 0),
4084                          ctdb_check_recd, ctdb);
4085 }
4086
4087 static void recd_sig_child_handler(struct tevent_context *ev,
4088                                    struct tevent_signal *se, int signum,
4089                                    int count, void *dont_care,
4090                                    void *private_data)
4091 {
4092 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4093         int status;
4094         pid_t pid = -1;
4095
4096         while (pid != 0) {
4097                 pid = waitpid(-1, &status, WNOHANG);
4098                 if (pid == -1) {
4099                         if (errno != ECHILD) {
4100                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4101                         }
4102                         return;
4103                 }
4104                 if (pid > 0) {
4105                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4106                 }
4107         }
4108 }
4109
4110 /*
4111   startup the recovery daemon as a child of the main ctdb daemon
4112  */
4113 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4114 {
4115         int fd[2];
4116         struct tevent_signal *se;
4117         struct tevent_fd *fde;
4118
4119         if (pipe(fd) != 0) {
4120                 return -1;
4121         }
4122
4123         ctdb->recoverd_pid = ctdb_fork(ctdb);
4124         if (ctdb->recoverd_pid == -1) {
4125                 return -1;
4126         }
4127
4128         if (ctdb->recoverd_pid != 0) {
4129                 talloc_free(ctdb->recd_ctx);
4130                 ctdb->recd_ctx = talloc_new(ctdb);
4131                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4132
4133                 close(fd[0]);
4134                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4135                                  timeval_current_ofs(30, 0),
4136                                  ctdb_check_recd, ctdb);
4137                 return 0;
4138         }
4139
4140         close(fd[1]);
4141
4142         srandom(getpid() ^ time(NULL));
4143
4144         ctdb_set_process_name("ctdb_recovered");
4145         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4146                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4147                 exit(1);
4148         }
4149
4150         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4151
4152         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4153                             ctdb_recoverd_parent, &fd[0]);
4154         tevent_fd_set_auto_close(fde);
4155
4156         /* set up a handler to pick up sigchld */
4157         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4158                                recd_sig_child_handler, ctdb);
4159         if (se == NULL) {
4160                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4161                 exit(1);
4162         }
4163
4164         monitor_cluster(ctdb);
4165
4166         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4167         return -1;
4168 }
4169
4170 /*
4171   shutdown the recovery daemon
4172  */
4173 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4174 {
4175         if (ctdb->recoverd_pid == 0) {
4176                 return;
4177         }
4178
4179         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4180         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4181
4182         TALLOC_FREE(ctdb->recd_ctx);
4183         TALLOC_FREE(ctdb->recd_ping_count);
4184 }
4185
4186 static void ctdb_restart_recd(struct tevent_context *ev,
4187                               struct tevent_timer *te,
4188                               struct timeval t, void *private_data)
4189 {
4190         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4191
4192         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4193         ctdb_stop_recoverd(ctdb);
4194         ctdb_start_recoverd(ctdb);
4195 }