recoverd: When starting a takeover run disable IP verification
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
90
91         if (!ctdb_validate_pnn(ctdb, pnn)) {
92                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
93                 return;
94         }
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   run the "recovered" eventscript on all nodes
112  */
113 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
114 {
115         TALLOC_CTX *tmp_ctx;
116         uint32_t *nodes;
117
118         tmp_ctx = talloc_new(ctdb);
119         CTDB_NO_MEMORY(ctdb, tmp_ctx);
120
121         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
122         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
123                                         nodes, 0,
124                                         CONTROL_TIMEOUT(), false, tdb_null,
125                                         NULL, NULL,
126                                         NULL) != 0) {
127                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
128
129                 talloc_free(tmp_ctx);
130                 return -1;
131         }
132
133         talloc_free(tmp_ctx);
134         return 0;
135 }
136
137 /*
138   remember the trouble maker
139  */
140 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
141 {
142         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
143         struct ctdb_banning_state *ban_state;
144
145         if (culprit > ctdb->num_nodes) {
146                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
147                 return;
148         }
149
150         if (ctdb->nodes[culprit]->ban_state == NULL) {
151                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
152                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
153
154                 
155         }
156         ban_state = ctdb->nodes[culprit]->ban_state;
157         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
158                 /* this was the first time in a long while this node
159                    misbehaved so we will forgive any old transgressions.
160                 */
161                 ban_state->count = 0;
162         }
163
164         ban_state->count += count;
165         ban_state->last_reported_time = timeval_current();
166         rec->last_culprit_node = culprit;
167 }
168
169 /*
170   remember the trouble maker
171  */
172 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
173 {
174         ctdb_set_culprit_count(rec, culprit, 1);
175 }
176
177
178 /* this callback is called for every node that failed to execute the
179    start recovery event
180 */
181 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
182 {
183         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
184
185         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
186
187         ctdb_set_culprit(rec, node_pnn);
188 }
189
190 /*
191   run the "startrecovery" eventscript on all nodes
192  */
193 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
194 {
195         TALLOC_CTX *tmp_ctx;
196         uint32_t *nodes;
197         struct ctdb_context *ctdb = rec->ctdb;
198
199         tmp_ctx = talloc_new(ctdb);
200         CTDB_NO_MEMORY(ctdb, tmp_ctx);
201
202         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
204                                         nodes, 0,
205                                         CONTROL_TIMEOUT(), false, tdb_null,
206                                         NULL,
207                                         startrecovery_fail_callback,
208                                         rec) != 0) {
209                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
210                 talloc_free(tmp_ctx);
211                 return -1;
212         }
213
214         talloc_free(tmp_ctx);
215         return 0;
216 }
217
218 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
219 {
220         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
221                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
222                 return;
223         }
224         if (node_pnn < ctdb->num_nodes) {
225                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
226         }
227
228         if (node_pnn == ctdb->pnn) {
229                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
230         }
231 }
232
233 /*
234   update the node capabilities for all connected nodes
235  */
236 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
237 {
238         uint32_t *nodes;
239         TALLOC_CTX *tmp_ctx;
240
241         tmp_ctx = talloc_new(ctdb);
242         CTDB_NO_MEMORY(ctdb, tmp_ctx);
243
244         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
245         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
246                                         nodes, 0,
247                                         CONTROL_TIMEOUT(),
248                                         false, tdb_null,
249                                         async_getcap_callback, NULL,
250                                         NULL) != 0) {
251                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
252                 talloc_free(tmp_ctx);
253                 return -1;
254         }
255
256         talloc_free(tmp_ctx);
257         return 0;
258 }
259
260 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
261 {
262         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
263
264         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
265         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
266 }
267
268 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
269 {
270         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
271
272         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
273         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
274 }
275
276 /*
277   change recovery mode on all nodes
278  */
279 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
280 {
281         TDB_DATA data;
282         uint32_t *nodes;
283         TALLOC_CTX *tmp_ctx;
284
285         tmp_ctx = talloc_new(ctdb);
286         CTDB_NO_MEMORY(ctdb, tmp_ctx);
287
288         /* freeze all nodes */
289         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
290         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
291                 int i;
292
293                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
294                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
295                                                 nodes, i,
296                                                 CONTROL_TIMEOUT(),
297                                                 false, tdb_null,
298                                                 NULL,
299                                                 set_recmode_fail_callback,
300                                                 rec) != 0) {
301                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
302                                 talloc_free(tmp_ctx);
303                                 return -1;
304                         }
305                 }
306         }
307
308
309         data.dsize = sizeof(uint32_t);
310         data.dptr = (unsigned char *)&rec_mode;
311
312         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
313                                         nodes, 0,
314                                         CONTROL_TIMEOUT(),
315                                         false, data,
316                                         NULL, NULL,
317                                         NULL) != 0) {
318                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
319                 talloc_free(tmp_ctx);
320                 return -1;
321         }
322
323         talloc_free(tmp_ctx);
324         return 0;
325 }
326
327 /*
328   change recovery master on all node
329  */
330 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
331 {
332         TDB_DATA data;
333         TALLOC_CTX *tmp_ctx;
334         uint32_t *nodes;
335
336         tmp_ctx = talloc_new(ctdb);
337         CTDB_NO_MEMORY(ctdb, tmp_ctx);
338
339         data.dsize = sizeof(uint32_t);
340         data.dptr = (unsigned char *)&pnn;
341
342         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
343         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
344                                         nodes, 0,
345                                         CONTROL_TIMEOUT(), false, data,
346                                         NULL, NULL,
347                                         NULL) != 0) {
348                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
349                 talloc_free(tmp_ctx);
350                 return -1;
351         }
352
353         talloc_free(tmp_ctx);
354         return 0;
355 }
356
357 /* update all remote nodes to use the same db priority that we have
358    this can fail if the remove node has not yet been upgraded to 
359    support this function, so we always return success and never fail
360    a recovery if this call fails.
361 */
362 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
363         struct ctdb_node_map *nodemap, 
364         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
365 {
366         int db;
367         uint32_t *nodes;
368
369         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
370
371         /* step through all local databases */
372         for (db=0; db<dbmap->num;db++) {
373                 TDB_DATA data;
374                 struct ctdb_db_priority db_prio;
375                 int ret;
376
377                 db_prio.db_id     = dbmap->dbs[db].dbid;
378                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
379                 if (ret != 0) {
380                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
381                         continue;
382                 }
383
384                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
385
386                 data.dptr  = (uint8_t *)&db_prio;
387                 data.dsize = sizeof(db_prio);
388
389                 if (ctdb_client_async_control(ctdb,
390                                         CTDB_CONTROL_SET_DB_PRIORITY,
391                                         nodes, 0,
392                                         CONTROL_TIMEOUT(), false, data,
393                                         NULL, NULL,
394                                         NULL) != 0) {
395                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
396                 }
397         }
398
399         return 0;
400 }                       
401
402 /*
403   ensure all other nodes have attached to any databases that we have
404  */
405 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
406                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
407 {
408         int i, j, db, ret;
409         struct ctdb_dbid_map *remote_dbmap;
410
411         /* verify that all other nodes have all our databases */
412         for (j=0; j<nodemap->num; j++) {
413                 /* we dont need to ourself ourselves */
414                 if (nodemap->nodes[j].pnn == pnn) {
415                         continue;
416                 }
417                 /* dont check nodes that are unavailable */
418                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
419                         continue;
420                 }
421
422                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
423                                          mem_ctx, &remote_dbmap);
424                 if (ret != 0) {
425                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
426                         return -1;
427                 }
428
429                 /* step through all local databases */
430                 for (db=0; db<dbmap->num;db++) {
431                         const char *name;
432
433
434                         for (i=0;i<remote_dbmap->num;i++) {
435                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
436                                         break;
437                                 }
438                         }
439                         /* the remote node already have this database */
440                         if (i!=remote_dbmap->num) {
441                                 continue;
442                         }
443                         /* ok so we need to create this database */
444                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
445                                             mem_ctx, &name);
446                         if (ret != 0) {
447                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
448                                 return -1;
449                         }
450                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
451                                            mem_ctx, name,
452                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
453                         if (ret != 0) {
454                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
455                                 return -1;
456                         }
457                 }
458         }
459
460         return 0;
461 }
462
463
464 /*
465   ensure we are attached to any databases that anyone else is attached to
466  */
467 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
468                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
469 {
470         int i, j, db, ret;
471         struct ctdb_dbid_map *remote_dbmap;
472
473         /* verify that we have all database any other node has */
474         for (j=0; j<nodemap->num; j++) {
475                 /* we dont need to ourself ourselves */
476                 if (nodemap->nodes[j].pnn == pnn) {
477                         continue;
478                 }
479                 /* dont check nodes that are unavailable */
480                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
481                         continue;
482                 }
483
484                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
485                                          mem_ctx, &remote_dbmap);
486                 if (ret != 0) {
487                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
488                         return -1;
489                 }
490
491                 /* step through all databases on the remote node */
492                 for (db=0; db<remote_dbmap->num;db++) {
493                         const char *name;
494
495                         for (i=0;i<(*dbmap)->num;i++) {
496                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
497                                         break;
498                                 }
499                         }
500                         /* we already have this db locally */
501                         if (i!=(*dbmap)->num) {
502                                 continue;
503                         }
504                         /* ok so we need to create this database and
505                            rebuild dbmap
506                          */
507                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
511                                           nodemap->nodes[j].pnn));
512                                 return -1;
513                         }
514                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
515                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
516                         if (ret != 0) {
517                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
518                                 return -1;
519                         }
520                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
521                         if (ret != 0) {
522                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
523                                 return -1;
524                         }
525                 }
526         }
527
528         return 0;
529 }
530
531
532 /*
533   pull the remote database contents from one node into the recdb
534  */
535 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
536                                     struct tdb_wrap *recdb, uint32_t dbid)
537 {
538         int ret;
539         TDB_DATA outdata;
540         struct ctdb_marshall_buffer *reply;
541         struct ctdb_rec_data *rec;
542         int i;
543         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
544
545         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
546                                CONTROL_TIMEOUT(), &outdata);
547         if (ret != 0) {
548                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552
553         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
554
555         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
556                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
557                 talloc_free(tmp_ctx);
558                 return -1;
559         }
560         
561         rec = (struct ctdb_rec_data *)&reply->data[0];
562         
563         for (i=0;
564              i<reply->count;
565              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
566                 TDB_DATA key, data;
567                 struct ctdb_ltdb_header *hdr;
568                 TDB_DATA existing;
569                 
570                 key.dptr = &rec->data[0];
571                 key.dsize = rec->keylen;
572                 data.dptr = &rec->data[key.dsize];
573                 data.dsize = rec->datalen;
574                 
575                 hdr = (struct ctdb_ltdb_header *)data.dptr;
576
577                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
578                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
579                         talloc_free(tmp_ctx);
580                         return -1;
581                 }
582
583                 /* fetch the existing record, if any */
584                 existing = tdb_fetch(recdb->tdb, key);
585                 
586                 if (existing.dptr != NULL) {
587                         struct ctdb_ltdb_header header;
588                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
589                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
590                                          (unsigned)existing.dsize, srcnode));
591                                 free(existing.dptr);
592                                 talloc_free(tmp_ctx);
593                                 return -1;
594                         }
595                         header = *(struct ctdb_ltdb_header *)existing.dptr;
596                         free(existing.dptr);
597                         if (!(header.rsn < hdr->rsn ||
598                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
599                                 continue;
600                         }
601                 }
602                 
603                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
604                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
605                         talloc_free(tmp_ctx);
606                         return -1;                              
607                 }
608         }
609
610         talloc_free(tmp_ctx);
611
612         return 0;
613 }
614
615
616 struct pull_seqnum_cbdata {
617         int failed;
618         uint32_t pnn;
619         uint64_t seqnum;
620 };
621
622 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
623 {
624         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
625         uint64_t seqnum;
626
627         if (cb_data->failed != 0) {
628                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
629                 return;
630         }
631
632         if (res != 0) {
633                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
634                 cb_data->failed = 1;
635                 return;
636         }
637
638         if (outdata.dsize != sizeof(uint64_t)) {
639                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
640                 cb_data->failed = -1;
641                 return;
642         }
643
644         seqnum = *((uint64_t *)outdata.dptr);
645
646         if (seqnum > cb_data->seqnum) {
647                 cb_data->seqnum = seqnum;
648                 cb_data->pnn = node_pnn;
649         }
650 }
651
652 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
653 {
654         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
655
656         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
657         cb_data->failed = 1;
658 }
659
660 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
661                                 struct ctdb_recoverd *rec, 
662                                 struct ctdb_node_map *nodemap, 
663                                 struct tdb_wrap *recdb, uint32_t dbid)
664 {
665         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
666         uint32_t *nodes;
667         TDB_DATA data;
668         uint32_t outdata[2];
669         struct pull_seqnum_cbdata *cb_data;
670
671         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
672
673         outdata[0] = dbid;
674         outdata[1] = 0;
675
676         data.dsize = sizeof(outdata);
677         data.dptr  = (uint8_t *)&outdata[0];
678
679         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
680         if (cb_data == NULL) {
681                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
682                 talloc_free(tmp_ctx);
683                 return -1;
684         }
685
686         cb_data->failed = 0;
687         cb_data->pnn    = -1;
688         cb_data->seqnum = 0;
689         
690         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
691         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
692                                         nodes, 0,
693                                         CONTROL_TIMEOUT(), false, data,
694                                         pull_seqnum_cb,
695                                         pull_seqnum_fail_cb,
696                                         cb_data) != 0) {
697                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
698
699                 talloc_free(tmp_ctx);
700                 return -1;
701         }
702
703         if (cb_data->failed != 0) {
704                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
705                 talloc_free(tmp_ctx);
706                 return -1;
707         }
708
709         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
710                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
711                 talloc_free(tmp_ctx);
712                 return -1;
713         }
714
715         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
716
717         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
718                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
719                 talloc_free(tmp_ctx);
720                 return -1;
721         }
722
723         talloc_free(tmp_ctx);
724         return 0;
725 }
726
727
728 /*
729   pull all the remote database contents into the recdb
730  */
731 static int pull_remote_database(struct ctdb_context *ctdb,
732                                 struct ctdb_recoverd *rec, 
733                                 struct ctdb_node_map *nodemap, 
734                                 struct tdb_wrap *recdb, uint32_t dbid,
735                                 bool persistent)
736 {
737         int j;
738
739         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
740                 int ret;
741                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
742                 if (ret == 0) {
743                         return 0;
744                 }
745         }
746
747         /* pull all records from all other nodes across onto this node
748            (this merges based on rsn)
749         */
750         for (j=0; j<nodemap->num; j++) {
751                 /* dont merge from nodes that are unavailable */
752                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
753                         continue;
754                 }
755                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
756                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
757                                  nodemap->nodes[j].pnn));
758                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
759                         return -1;
760                 }
761         }
762         
763         return 0;
764 }
765
766
767 /*
768   update flags on all active nodes
769  */
770 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
771 {
772         int ret;
773
774         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
775                 if (ret != 0) {
776                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
777                 return -1;
778         }
779
780         return 0;
781 }
782
783 /*
784   ensure all nodes have the same vnnmap we do
785  */
786 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
787                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
788 {
789         int j, ret;
790
791         /* push the new vnn map out to all the nodes */
792         for (j=0; j<nodemap->num; j++) {
793                 /* dont push to nodes that are unavailable */
794                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
795                         continue;
796                 }
797
798                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
801                         return -1;
802                 }
803         }
804
805         return 0;
806 }
807
808
809 struct vacuum_info {
810         struct vacuum_info *next, *prev;
811         struct ctdb_recoverd *rec;
812         uint32_t srcnode;
813         struct ctdb_db_context *ctdb_db;
814         struct ctdb_marshall_buffer *recs;
815         struct ctdb_rec_data *r;
816 };
817
818 static void vacuum_fetch_next(struct vacuum_info *v);
819
820 /*
821   called when a vacuum fetch has completed - just free it and do the next one
822  */
823 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
824 {
825         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
826         talloc_free(state);
827         vacuum_fetch_next(v);
828 }
829
830
831 /*
832   process the next element from the vacuum list
833 */
834 static void vacuum_fetch_next(struct vacuum_info *v)
835 {
836         struct ctdb_call call;
837         struct ctdb_rec_data *r;
838
839         while (v->recs->count) {
840                 struct ctdb_client_call_state *state;
841                 TDB_DATA data;
842                 struct ctdb_ltdb_header *hdr;
843
844                 ZERO_STRUCT(call);
845                 call.call_id = CTDB_NULL_FUNC;
846                 call.flags = CTDB_IMMEDIATE_MIGRATION;
847                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
848
849                 r = v->r;
850                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
851                 v->recs->count--;
852
853                 call.key.dptr = &r->data[0];
854                 call.key.dsize = r->keylen;
855
856                 /* ensure we don't block this daemon - just skip a record if we can't get
857                    the chainlock */
858                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
859                         continue;
860                 }
861
862                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
863                 if (data.dptr == NULL) {
864                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
865                         continue;
866                 }
867
868                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
869                         free(data.dptr);
870                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
871                         continue;
872                 }
873                 
874                 hdr = (struct ctdb_ltdb_header *)data.dptr;
875                 if (hdr->dmaster == v->rec->ctdb->pnn) {
876                         /* its already local */
877                         free(data.dptr);
878                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
879                         continue;
880                 }
881
882                 free(data.dptr);
883
884                 state = ctdb_call_send(v->ctdb_db, &call);
885                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
886                 if (state == NULL) {
887                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
888                         talloc_free(v);
889                         return;
890                 }
891                 state->async.fn = vacuum_fetch_callback;
892                 state->async.private_data = v;
893                 return;
894         }
895
896         talloc_free(v);
897 }
898
899
900 /*
901   destroy a vacuum info structure
902  */
903 static int vacuum_info_destructor(struct vacuum_info *v)
904 {
905         DLIST_REMOVE(v->rec->vacuum_info, v);
906         return 0;
907 }
908
909
910 /*
911   handler for vacuum fetch
912 */
913 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
914                                  TDB_DATA data, void *private_data)
915 {
916         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
917         struct ctdb_marshall_buffer *recs;
918         int ret, i;
919         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
920         const char *name;
921         struct ctdb_dbid_map *dbmap=NULL;
922         bool persistent = false;
923         struct ctdb_db_context *ctdb_db;
924         struct ctdb_rec_data *r;
925         uint32_t srcnode;
926         struct vacuum_info *v;
927
928         recs = (struct ctdb_marshall_buffer *)data.dptr;
929         r = (struct ctdb_rec_data *)&recs->data[0];
930
931         if (recs->count == 0) {
932                 talloc_free(tmp_ctx);
933                 return;
934         }
935
936         srcnode = r->reqid;
937
938         for (v=rec->vacuum_info;v;v=v->next) {
939                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
940                         /* we're already working on records from this node */
941                         talloc_free(tmp_ctx);
942                         return;
943                 }
944         }
945
946         /* work out if the database is persistent */
947         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
948         if (ret != 0) {
949                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
950                 talloc_free(tmp_ctx);
951                 return;
952         }
953
954         for (i=0;i<dbmap->num;i++) {
955                 if (dbmap->dbs[i].dbid == recs->db_id) {
956                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
957                         break;
958                 }
959         }
960         if (i == dbmap->num) {
961                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
962                 talloc_free(tmp_ctx);
963                 return;         
964         }
965
966         /* find the name of this database */
967         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
968                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         /* attach to it */
974         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
975         if (ctdb_db == NULL) {
976                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
977                 talloc_free(tmp_ctx);
978                 return;
979         }
980
981         v = talloc_zero(rec, struct vacuum_info);
982         if (v == NULL) {
983                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
984                 talloc_free(tmp_ctx);
985                 return;
986         }
987
988         v->rec = rec;
989         v->srcnode = srcnode;
990         v->ctdb_db = ctdb_db;
991         v->recs = talloc_memdup(v, recs, data.dsize);
992         if (v->recs == NULL) {
993                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
994                 talloc_free(v);
995                 talloc_free(tmp_ctx);
996                 return;         
997         }
998         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
999
1000         DLIST_ADD(rec->vacuum_info, v);
1001
1002         talloc_set_destructor(v, vacuum_info_destructor);
1003
1004         vacuum_fetch_next(v);
1005         talloc_free(tmp_ctx);
1006 }
1007
1008
1009 /*
1010   called when ctdb_wait_timeout should finish
1011  */
1012 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1013                               struct timeval yt, void *p)
1014 {
1015         uint32_t *timed_out = (uint32_t *)p;
1016         (*timed_out) = 1;
1017 }
1018
1019 /*
1020   wait for a given number of seconds
1021  */
1022 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1023 {
1024         uint32_t timed_out = 0;
1025         time_t usecs = (secs - (time_t)secs) * 1000000;
1026         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1027         while (!timed_out) {
1028                 event_loop_once(ctdb->ev);
1029         }
1030 }
1031
1032 /*
1033   called when an election times out (ends)
1034  */
1035 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1036                                   struct timeval t, void *p)
1037 {
1038         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1039         rec->election_timeout = NULL;
1040         fast_start = false;
1041
1042         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1043 }
1044
1045
1046 /*
1047   wait for an election to finish. It finished election_timeout seconds after
1048   the last election packet is received
1049  */
1050 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1051 {
1052         struct ctdb_context *ctdb = rec->ctdb;
1053         while (rec->election_timeout) {
1054                 event_loop_once(ctdb->ev);
1055         }
1056 }
1057
1058 /*
1059   Update our local flags from all remote connected nodes. 
1060   This is only run when we are or we belive we are the recovery master
1061  */
1062 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1063 {
1064         int j;
1065         struct ctdb_context *ctdb = rec->ctdb;
1066         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1067
1068         /* get the nodemap for all active remote nodes and verify
1069            they are the same as for this node
1070          */
1071         for (j=0; j<nodemap->num; j++) {
1072                 struct ctdb_node_map *remote_nodemap=NULL;
1073                 int ret;
1074
1075                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1076                         continue;
1077                 }
1078                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1079                         continue;
1080                 }
1081
1082                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1083                                            mem_ctx, &remote_nodemap);
1084                 if (ret != 0) {
1085                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1086                                   nodemap->nodes[j].pnn));
1087                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1088                         talloc_free(mem_ctx);
1089                         return MONITOR_FAILED;
1090                 }
1091                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1092                         /* We should tell our daemon about this so it
1093                            updates its flags or else we will log the same 
1094                            message again in the next iteration of recovery.
1095                            Since we are the recovery master we can just as
1096                            well update the flags on all nodes.
1097                         */
1098                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1099                         if (ret != 0) {
1100                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1101                                 return -1;
1102                         }
1103
1104                         /* Update our local copy of the flags in the recovery
1105                            daemon.
1106                         */
1107                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1108                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1109                                  nodemap->nodes[j].flags));
1110                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1111                 }
1112                 talloc_free(remote_nodemap);
1113         }
1114         talloc_free(mem_ctx);
1115         return MONITOR_OK;
1116 }
1117
1118
1119 /* Create a new random generation ip. 
1120    The generation id can not be the INVALID_GENERATION id
1121 */
1122 static uint32_t new_generation(void)
1123 {
1124         uint32_t generation;
1125
1126         while (1) {
1127                 generation = random();
1128
1129                 if (generation != INVALID_GENERATION) {
1130                         break;
1131                 }
1132         }
1133
1134         return generation;
1135 }
1136
1137
1138 /*
1139   create a temporary working database
1140  */
1141 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1142 {
1143         char *name;
1144         struct tdb_wrap *recdb;
1145         unsigned tdb_flags;
1146
1147         /* open up the temporary recovery database */
1148         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1149                                ctdb->db_directory_state,
1150                                ctdb->pnn);
1151         if (name == NULL) {
1152                 return NULL;
1153         }
1154         unlink(name);
1155
1156         tdb_flags = TDB_NOLOCK;
1157         if (ctdb->valgrinding) {
1158                 tdb_flags |= TDB_NOMMAP;
1159         }
1160         tdb_flags |= TDB_DISALLOW_NESTING;
1161
1162         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1163                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1164         if (recdb == NULL) {
1165                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1166         }
1167
1168         talloc_free(name);
1169
1170         return recdb;
1171 }
1172
1173
1174 /* 
1175    a traverse function for pulling all relevent records from recdb
1176  */
1177 struct recdb_data {
1178         struct ctdb_context *ctdb;
1179         struct ctdb_marshall_buffer *recdata;
1180         uint32_t len;
1181         uint32_t allocated_len;
1182         bool failed;
1183         bool persistent;
1184 };
1185
1186 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1187 {
1188         struct recdb_data *params = (struct recdb_data *)p;
1189         struct ctdb_rec_data *rec;
1190         struct ctdb_ltdb_header *hdr;
1191
1192         /* skip empty records */
1193         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1194                 return 0;
1195         }
1196
1197         /* update the dmaster field to point to us */
1198         hdr = (struct ctdb_ltdb_header *)data.dptr;
1199         if (!params->persistent) {
1200                 hdr->dmaster = params->ctdb->pnn;
1201                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1202         }
1203
1204         /* add the record to the blob ready to send to the nodes */
1205         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1206         if (rec == NULL) {
1207                 params->failed = true;
1208                 return -1;
1209         }
1210         if (params->len + rec->length >= params->allocated_len) {
1211                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1212                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1213         }
1214         if (params->recdata == NULL) {
1215                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1216                          rec->length + params->len, params->recdata->count));
1217                 params->failed = true;
1218                 return -1;
1219         }
1220         params->recdata->count++;
1221         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1222         params->len += rec->length;
1223         talloc_free(rec);
1224
1225         return 0;
1226 }
1227
1228 /*
1229   push the recdb database out to all nodes
1230  */
1231 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1232                                bool persistent,
1233                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1234 {
1235         struct recdb_data params;
1236         struct ctdb_marshall_buffer *recdata;
1237         TDB_DATA outdata;
1238         TALLOC_CTX *tmp_ctx;
1239         uint32_t *nodes;
1240
1241         tmp_ctx = talloc_new(ctdb);
1242         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1243
1244         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1245         CTDB_NO_MEMORY(ctdb, recdata);
1246
1247         recdata->db_id = dbid;
1248
1249         params.ctdb = ctdb;
1250         params.recdata = recdata;
1251         params.len = offsetof(struct ctdb_marshall_buffer, data);
1252         params.allocated_len = params.len;
1253         params.failed = false;
1254         params.persistent = persistent;
1255
1256         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1257                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1258                 talloc_free(params.recdata);
1259                 talloc_free(tmp_ctx);
1260                 return -1;
1261         }
1262
1263         if (params.failed) {
1264                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1265                 talloc_free(params.recdata);
1266                 talloc_free(tmp_ctx);
1267                 return -1;              
1268         }
1269
1270         recdata = params.recdata;
1271
1272         outdata.dptr = (void *)recdata;
1273         outdata.dsize = params.len;
1274
1275         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1276         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1277                                         nodes, 0,
1278                                         CONTROL_TIMEOUT(), false, outdata,
1279                                         NULL, NULL,
1280                                         NULL) != 0) {
1281                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1282                 talloc_free(recdata);
1283                 talloc_free(tmp_ctx);
1284                 return -1;
1285         }
1286
1287         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1288                   dbid, recdata->count));
1289
1290         talloc_free(recdata);
1291         talloc_free(tmp_ctx);
1292
1293         return 0;
1294 }
1295
1296
1297 /*
1298   go through a full recovery on one database 
1299  */
1300 static int recover_database(struct ctdb_recoverd *rec, 
1301                             TALLOC_CTX *mem_ctx,
1302                             uint32_t dbid,
1303                             bool persistent,
1304                             uint32_t pnn, 
1305                             struct ctdb_node_map *nodemap,
1306                             uint32_t transaction_id)
1307 {
1308         struct tdb_wrap *recdb;
1309         int ret;
1310         struct ctdb_context *ctdb = rec->ctdb;
1311         TDB_DATA data;
1312         struct ctdb_control_wipe_database w;
1313         uint32_t *nodes;
1314
1315         recdb = create_recdb(ctdb, mem_ctx);
1316         if (recdb == NULL) {
1317                 return -1;
1318         }
1319
1320         /* pull all remote databases onto the recdb */
1321         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1322         if (ret != 0) {
1323                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1324                 return -1;
1325         }
1326
1327         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1328
1329         /* wipe all the remote databases. This is safe as we are in a transaction */
1330         w.db_id = dbid;
1331         w.transaction_id = transaction_id;
1332
1333         data.dptr = (void *)&w;
1334         data.dsize = sizeof(w);
1335
1336         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1337         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1338                                         nodes, 0,
1339                                         CONTROL_TIMEOUT(), false, data,
1340                                         NULL, NULL,
1341                                         NULL) != 0) {
1342                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1343                 talloc_free(recdb);
1344                 return -1;
1345         }
1346         
1347         /* push out the correct database. This sets the dmaster and skips 
1348            the empty records */
1349         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1350         if (ret != 0) {
1351                 talloc_free(recdb);
1352                 return -1;
1353         }
1354
1355         /* all done with this database */
1356         talloc_free(recdb);
1357
1358         return 0;
1359 }
1360
1361 /*
1362   reload the nodes file 
1363 */
1364 static void reload_nodes_file(struct ctdb_context *ctdb)
1365 {
1366         ctdb->nodes = NULL;
1367         ctdb_load_nodes_file(ctdb);
1368 }
1369
1370 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1371                                          struct ctdb_recoverd *rec,
1372                                          struct ctdb_node_map *nodemap,
1373                                          uint32_t *culprit)
1374 {
1375         int j;
1376         int ret;
1377
1378         if (ctdb->num_nodes != nodemap->num) {
1379                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1380                                   ctdb->num_nodes, nodemap->num));
1381                 if (culprit) {
1382                         *culprit = ctdb->pnn;
1383                 }
1384                 return -1;
1385         }
1386
1387         for (j=0; j<nodemap->num; j++) {
1388                 /* release any existing data */
1389                 if (ctdb->nodes[j]->known_public_ips) {
1390                         talloc_free(ctdb->nodes[j]->known_public_ips);
1391                         ctdb->nodes[j]->known_public_ips = NULL;
1392                 }
1393                 if (ctdb->nodes[j]->available_public_ips) {
1394                         talloc_free(ctdb->nodes[j]->available_public_ips);
1395                         ctdb->nodes[j]->available_public_ips = NULL;
1396                 }
1397
1398                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1399                         continue;
1400                 }
1401
1402                 /* grab a new shiny list of public ips from the node */
1403                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1404                                         CONTROL_TIMEOUT(),
1405                                         ctdb->nodes[j]->pnn,
1406                                         ctdb->nodes,
1407                                         0,
1408                                         &ctdb->nodes[j]->known_public_ips);
1409                 if (ret != 0) {
1410                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1411                                 ctdb->nodes[j]->pnn));
1412                         if (culprit) {
1413                                 *culprit = ctdb->nodes[j]->pnn;
1414                         }
1415                         return -1;
1416                 }
1417
1418                 if (ctdb->do_checkpublicip) {
1419                         if (rec->ip_check_disable_ctx == NULL) {
1420                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1421                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1422                                         rec->need_takeover_run = true;
1423                                 }
1424                         }
1425                 }
1426
1427                 /* grab a new shiny list of public ips from the node */
1428                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1429                                         CONTROL_TIMEOUT(),
1430                                         ctdb->nodes[j]->pnn,
1431                                         ctdb->nodes,
1432                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1433                                         &ctdb->nodes[j]->available_public_ips);
1434                 if (ret != 0) {
1435                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1436                                 ctdb->nodes[j]->pnn));
1437                         if (culprit) {
1438                                 *culprit = ctdb->nodes[j]->pnn;
1439                         }
1440                         return -1;
1441                 }
1442         }
1443
1444         return 0;
1445 }
1446
1447 /* when we start a recovery, make sure all nodes use the same reclock file
1448    setting
1449 */
1450 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1451 {
1452         struct ctdb_context *ctdb = rec->ctdb;
1453         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1454         TDB_DATA data;
1455         uint32_t *nodes;
1456
1457         if (ctdb->recovery_lock_file == NULL) {
1458                 data.dptr  = NULL;
1459                 data.dsize = 0;
1460         } else {
1461                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1462                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1463         }
1464
1465         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1466         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1467                                         nodes, 0,
1468                                         CONTROL_TIMEOUT(),
1469                                         false, data,
1470                                         NULL, NULL,
1471                                         rec) != 0) {
1472                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1473                 talloc_free(tmp_ctx);
1474                 return -1;
1475         }
1476
1477         talloc_free(tmp_ctx);
1478         return 0;
1479 }
1480
1481
1482 /*
1483   we are the recmaster, and recovery is needed - start a recovery run
1484  */
1485 static int do_recovery(struct ctdb_recoverd *rec, 
1486                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1487                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1488 {
1489         struct ctdb_context *ctdb = rec->ctdb;
1490         int i, j, ret;
1491         uint32_t generation;
1492         struct ctdb_dbid_map *dbmap;
1493         TDB_DATA data;
1494         uint32_t *nodes;
1495         struct timeval start_time;
1496         uint32_t culprit = (uint32_t)-1;
1497
1498         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1499
1500         /* if recovery fails, force it again */
1501         rec->need_recovery = true;
1502
1503         for (i=0; i<ctdb->num_nodes; i++) {
1504                 struct ctdb_banning_state *ban_state;
1505
1506                 if (ctdb->nodes[i]->ban_state == NULL) {
1507                         continue;
1508                 }
1509                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1510                 if (ban_state->count < 2*ctdb->num_nodes) {
1511                         continue;
1512                 }
1513                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1514                         ctdb->nodes[i]->pnn, ban_state->count,
1515                         ctdb->tunable.recovery_ban_period));
1516                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1517                 ban_state->count = 0;
1518         }
1519
1520
1521         if (ctdb->tunable.verify_recovery_lock != 0) {
1522                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1523                 start_time = timeval_current();
1524                 if (!ctdb_recovery_lock(ctdb, true)) {
1525                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1526                                          "and ban ourself for %u seconds\n",
1527                                          ctdb->tunable.recovery_ban_period));
1528                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1529                         return -1;
1530                 }
1531                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1532                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1533         }
1534
1535         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1536
1537         /* get a list of all databases */
1538         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1539         if (ret != 0) {
1540                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1541                 return -1;
1542         }
1543
1544         /* we do the db creation before we set the recovery mode, so the freeze happens
1545            on all databases we will be dealing with. */
1546
1547         /* verify that we have all the databases any other node has */
1548         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1549         if (ret != 0) {
1550                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1551                 return -1;
1552         }
1553
1554         /* verify that all other nodes have all our databases */
1555         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1556         if (ret != 0) {
1557                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1558                 return -1;
1559         }
1560         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1561
1562         /* update the database priority for all remote databases */
1563         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1564         if (ret != 0) {
1565                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1566         }
1567         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1568
1569
1570         /* update all other nodes to use the same setting for reclock files
1571            as the local recovery master.
1572         */
1573         sync_recovery_lock_file_across_cluster(rec);
1574
1575         /* set recovery mode to active on all nodes */
1576         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1577         if (ret != 0) {
1578                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1579                 return -1;
1580         }
1581
1582         /* execute the "startrecovery" event script on all nodes */
1583         ret = run_startrecovery_eventscript(rec, nodemap);
1584         if (ret!=0) {
1585                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1586                 return -1;
1587         }
1588
1589         /*
1590           update all nodes to have the same flags that we have
1591          */
1592         for (i=0;i<nodemap->num;i++) {
1593                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1594                         continue;
1595                 }
1596
1597                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1598                 if (ret != 0) {
1599                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1600                         return -1;
1601                 }
1602         }
1603
1604         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1605
1606         /* pick a new generation number */
1607         generation = new_generation();
1608
1609         /* change the vnnmap on this node to use the new generation 
1610            number but not on any other nodes.
1611            this guarantees that if we abort the recovery prematurely
1612            for some reason (a node stops responding?)
1613            that we can just return immediately and we will reenter
1614            recovery shortly again.
1615            I.e. we deliberately leave the cluster with an inconsistent
1616            generation id to allow us to abort recovery at any stage and
1617            just restart it from scratch.
1618          */
1619         vnnmap->generation = generation;
1620         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1621         if (ret != 0) {
1622                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1623                 return -1;
1624         }
1625
1626         data.dptr = (void *)&generation;
1627         data.dsize = sizeof(uint32_t);
1628
1629         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1630         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1631                                         nodes, 0,
1632                                         CONTROL_TIMEOUT(), false, data,
1633                                         NULL,
1634                                         transaction_start_fail_callback,
1635                                         rec) != 0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1637                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1638                                         nodes, 0,
1639                                         CONTROL_TIMEOUT(), false, tdb_null,
1640                                         NULL,
1641                                         NULL,
1642                                         NULL) != 0) {
1643                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1644                 }
1645                 return -1;
1646         }
1647
1648         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1649
1650         for (i=0;i<dbmap->num;i++) {
1651                 ret = recover_database(rec, mem_ctx,
1652                                        dbmap->dbs[i].dbid,
1653                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1654                                        pnn, nodemap, generation);
1655                 if (ret != 0) {
1656                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1657                         return -1;
1658                 }
1659         }
1660
1661         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1662
1663         /* commit all the changes */
1664         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1665                                         nodes, 0,
1666                                         CONTROL_TIMEOUT(), false, data,
1667                                         NULL, NULL,
1668                                         NULL) != 0) {
1669                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1670                 return -1;
1671         }
1672
1673         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1674         
1675
1676         /* update the capabilities for all nodes */
1677         ret = update_capabilities(ctdb, nodemap);
1678         if (ret!=0) {
1679                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1680                 return -1;
1681         }
1682
1683         /* build a new vnn map with all the currently active and
1684            unbanned nodes */
1685         generation = new_generation();
1686         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1687         CTDB_NO_MEMORY(ctdb, vnnmap);
1688         vnnmap->generation = generation;
1689         vnnmap->size = 0;
1690         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1691         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1692         for (i=j=0;i<nodemap->num;i++) {
1693                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1694                         continue;
1695                 }
1696                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1697                         /* this node can not be an lmaster */
1698                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1699                         continue;
1700                 }
1701
1702                 vnnmap->size++;
1703                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1704                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1705                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1706
1707         }
1708         if (vnnmap->size == 0) {
1709                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1710                 vnnmap->size++;
1711                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1712                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1713                 vnnmap->map[0] = pnn;
1714         }       
1715
1716         /* update to the new vnnmap on all nodes */
1717         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1718         if (ret != 0) {
1719                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1720                 return -1;
1721         }
1722
1723         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1724
1725         /* update recmaster to point to us for all nodes */
1726         ret = set_recovery_master(ctdb, nodemap, pnn);
1727         if (ret!=0) {
1728                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1729                 return -1;
1730         }
1731
1732         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1733
1734         /*
1735           update all nodes to have the same flags that we have
1736          */
1737         for (i=0;i<nodemap->num;i++) {
1738                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1739                         continue;
1740                 }
1741
1742                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1743                 if (ret != 0) {
1744                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1745                         return -1;
1746                 }
1747         }
1748
1749         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1750
1751         /* disable recovery mode */
1752         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1753         if (ret != 0) {
1754                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1755                 return -1;
1756         }
1757
1758         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1759
1760         /*
1761           tell nodes to takeover their public IPs
1762          */
1763         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1764         if (ret != 0) {
1765                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1766                                  culprit));
1767                 rec->need_takeover_run = true;
1768                 return -1;
1769         }
1770         rec->need_takeover_run = false;
1771         ret = ctdb_takeover_run(ctdb, nodemap);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1774                 rec->need_takeover_run = true;
1775         }
1776
1777         /* execute the "recovered" event script on all nodes */
1778         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1779         if (ret!=0) {
1780                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1781                 return -1;
1782         }
1783
1784         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1785
1786         /* send a message to all clients telling them that the cluster 
1787            has been reconfigured */
1788         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1789
1790         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1791
1792         rec->need_recovery = false;
1793
1794         /* we managed to complete a full recovery, make sure to forgive
1795            any past sins by the nodes that could now participate in the
1796            recovery.
1797         */
1798         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1799         for (i=0;i<nodemap->num;i++) {
1800                 struct ctdb_banning_state *ban_state;
1801
1802                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1803                         continue;
1804                 }
1805
1806                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1807                 if (ban_state == NULL) {
1808                         continue;
1809                 }
1810
1811                 ban_state->count = 0;
1812         }
1813
1814
1815         /* We just finished a recovery successfully. 
1816            We now wait for rerecovery_timeout before we allow 
1817            another recovery to take place.
1818         */
1819         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1820         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1821         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1822
1823         return 0;
1824 }
1825
1826
1827 /*
1828   elections are won by first checking the number of connected nodes, then
1829   the priority time, then the pnn
1830  */
1831 struct election_message {
1832         uint32_t num_connected;
1833         struct timeval priority_time;
1834         uint32_t pnn;
1835         uint32_t node_flags;
1836 };
1837
1838 /*
1839   form this nodes election data
1840  */
1841 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1842 {
1843         int ret, i;
1844         struct ctdb_node_map *nodemap;
1845         struct ctdb_context *ctdb = rec->ctdb;
1846
1847         ZERO_STRUCTP(em);
1848
1849         em->pnn = rec->ctdb->pnn;
1850         em->priority_time = rec->priority_time;
1851
1852         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1855                 return;
1856         }
1857
1858         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1859         em->node_flags = rec->node_flags;
1860
1861         for (i=0;i<nodemap->num;i++) {
1862                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1863                         em->num_connected++;
1864                 }
1865         }
1866
1867         /* we shouldnt try to win this election if we cant be a recmaster */
1868         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1869                 em->num_connected = 0;
1870                 em->priority_time = timeval_current();
1871         }
1872
1873         talloc_free(nodemap);
1874 }
1875
1876 /*
1877   see if the given election data wins
1878  */
1879 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1880 {
1881         struct election_message myem;
1882         int cmp = 0;
1883
1884         ctdb_election_data(rec, &myem);
1885
1886         /* we cant win if we dont have the recmaster capability */
1887         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1888                 return false;
1889         }
1890
1891         /* we cant win if we are banned */
1892         if (rec->node_flags & NODE_FLAGS_BANNED) {
1893                 return false;
1894         }       
1895
1896         /* we cant win if we are stopped */
1897         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1898                 return false;
1899         }       
1900
1901         /* we will automatically win if the other node is banned */
1902         if (em->node_flags & NODE_FLAGS_BANNED) {
1903                 return true;
1904         }
1905
1906         /* we will automatically win if the other node is banned */
1907         if (em->node_flags & NODE_FLAGS_STOPPED) {
1908                 return true;
1909         }
1910
1911         /* try to use the most connected node */
1912         if (cmp == 0) {
1913                 cmp = (int)myem.num_connected - (int)em->num_connected;
1914         }
1915
1916         /* then the longest running node */
1917         if (cmp == 0) {
1918                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1919         }
1920
1921         if (cmp == 0) {
1922                 cmp = (int)myem.pnn - (int)em->pnn;
1923         }
1924
1925         return cmp > 0;
1926 }
1927
1928 /*
1929   send out an election request
1930  */
1931 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1932 {
1933         int ret;
1934         TDB_DATA election_data;
1935         struct election_message emsg;
1936         uint64_t srvid;
1937         struct ctdb_context *ctdb = rec->ctdb;
1938
1939         srvid = CTDB_SRVID_RECOVERY;
1940
1941         ctdb_election_data(rec, &emsg);
1942
1943         election_data.dsize = sizeof(struct election_message);
1944         election_data.dptr  = (unsigned char *)&emsg;
1945
1946
1947         /* send an election message to all active nodes */
1948         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1949         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1950
1951
1952         /* A new node that is already frozen has entered the cluster.
1953            The existing nodes are not frozen and dont need to be frozen
1954            until the election has ended and we start the actual recovery
1955         */
1956         if (update_recmaster == true) {
1957                 /* first we assume we will win the election and set 
1958                    recoverymaster to be ourself on the current node
1959                  */
1960                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1961                 if (ret != 0) {
1962                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1963                         return -1;
1964                 }
1965         }
1966
1967
1968         return 0;
1969 }
1970
1971 /*
1972   this function will unban all nodes in the cluster
1973 */
1974 static void unban_all_nodes(struct ctdb_context *ctdb)
1975 {
1976         int ret, i;
1977         struct ctdb_node_map *nodemap;
1978         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1979         
1980         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1981         if (ret != 0) {
1982                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1983                 return;
1984         }
1985
1986         for (i=0;i<nodemap->num;i++) {
1987                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1988                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1989                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1990                 }
1991         }
1992
1993         talloc_free(tmp_ctx);
1994 }
1995
1996
1997 /*
1998   we think we are winning the election - send a broadcast election request
1999  */
2000 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2001 {
2002         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2003         int ret;
2004
2005         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2006         if (ret != 0) {
2007                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2008         }
2009
2010         talloc_free(rec->send_election_te);
2011         rec->send_election_te = NULL;
2012 }
2013
2014 /*
2015   handler for memory dumps
2016 */
2017 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2018                              TDB_DATA data, void *private_data)
2019 {
2020         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2021         TDB_DATA *dump;
2022         int ret;
2023         struct rd_memdump_reply *rd;
2024
2025         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2026                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2027                 talloc_free(tmp_ctx);
2028                 return;
2029         }
2030         rd = (struct rd_memdump_reply *)data.dptr;
2031
2032         dump = talloc_zero(tmp_ctx, TDB_DATA);
2033         if (dump == NULL) {
2034                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2035                 talloc_free(tmp_ctx);
2036                 return;
2037         }
2038         ret = ctdb_dump_memory(ctdb, dump);
2039         if (ret != 0) {
2040                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2041                 talloc_free(tmp_ctx);
2042                 return;
2043         }
2044
2045 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2046
2047         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2048         if (ret != 0) {
2049                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2050                 talloc_free(tmp_ctx);
2051                 return;
2052         }
2053
2054         talloc_free(tmp_ctx);
2055 }
2056
2057 /*
2058   handler for reload_nodes
2059 */
2060 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2061                              TDB_DATA data, void *private_data)
2062 {
2063         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2064
2065         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2066
2067         reload_nodes_file(rec->ctdb);
2068 }
2069
2070
2071 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2072                               struct timeval yt, void *p)
2073 {
2074         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2075
2076         talloc_free(rec->ip_check_disable_ctx);
2077         rec->ip_check_disable_ctx = NULL;
2078 }
2079
2080
2081 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2082                                   struct timeval t, void *p)
2083 {
2084         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2085         struct ctdb_context *ctdb = rec->ctdb;
2086         int ret;
2087
2088         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2089
2090         ret = ctdb_takeover_run(ctdb, rec->nodemap);
2091         if (ret != 0) {
2092                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2093                 rec->need_takeover_run = true;
2094         }
2095
2096         talloc_free(rec->deferred_rebalance_ctx);
2097         rec->deferred_rebalance_ctx = NULL;
2098 }
2099
2100         
2101 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2102                              TDB_DATA data, void *private_data)
2103 {
2104         uint32_t pnn;
2105         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2106
2107         if (data.dsize != sizeof(uint32_t)) {
2108                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2109                 return;
2110         }
2111
2112         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2113                 return;
2114         }
2115
2116         pnn = *(uint32_t *)&data.dptr[0];
2117
2118         lcp2_forcerebalance(ctdb, pnn);
2119         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2120
2121         if (rec->deferred_rebalance_ctx != NULL) {
2122                 talloc_free(rec->deferred_rebalance_ctx);
2123         }
2124         rec->deferred_rebalance_ctx = talloc_new(rec);
2125         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2126                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2127                         ctdb_rebalance_timeout, rec);
2128 }
2129
2130
2131
2132 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2133                              TDB_DATA data, void *private_data)
2134 {
2135         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2136         struct ctdb_public_ip *ip;
2137
2138         if (rec->recmaster != rec->ctdb->pnn) {
2139                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2140                 return;
2141         }
2142
2143         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2144                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2145                 return;
2146         }
2147
2148         ip = (struct ctdb_public_ip *)data.dptr;
2149
2150         update_ip_assignment_tree(rec->ctdb, ip);
2151 }
2152
2153
2154 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2155                              TDB_DATA data, void *private_data)
2156 {
2157         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2158         uint32_t timeout;
2159
2160         if (rec->ip_check_disable_ctx != NULL) {
2161                 talloc_free(rec->ip_check_disable_ctx);
2162                 rec->ip_check_disable_ctx = NULL;
2163         }
2164
2165         if (data.dsize != sizeof(uint32_t)) {
2166                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2167                                  "expexting %lu\n", (long unsigned)data.dsize,
2168                                  (long unsigned)sizeof(uint32_t)));
2169                 return;
2170         }
2171         if (data.dptr == NULL) {
2172                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2173                 return;
2174         }
2175
2176         timeout = *((uint32_t *)data.dptr);
2177
2178         if (timeout == 0) {
2179                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2180                 return;
2181         }
2182                 
2183         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2184
2185         rec->ip_check_disable_ctx = talloc_new(rec);
2186         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2187
2188         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2189 }
2190
2191
2192 /*
2193   handler for reload all ips.
2194 */
2195 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2196                              TDB_DATA data, void *private_data)
2197 {
2198         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2199
2200         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2201                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2202                 return;
2203         }
2204
2205         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2206
2207         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2208         return;
2209 }
2210
2211 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2212 {
2213         uint32_t *status = callback_data;
2214
2215         if (res != 0) {
2216                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2217                 *status = 1;
2218         }
2219 }
2220
2221 static int
2222 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2223 {
2224         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2225         uint32_t *nodes;
2226         uint32_t status;
2227         int i;
2228
2229         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2230         for (i = 0; i< nodemap->num; i++) {
2231                 if (nodemap->nodes[i].flags != 0) {
2232                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2233                         talloc_free(tmp_ctx);
2234                         return -1;
2235                 }
2236         }
2237
2238         /* send the flags update to all connected nodes */
2239         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2240         status = 0;
2241         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2242                                         nodes, 0,
2243                                         CONTROL_TIMEOUT(),
2244                                         false, tdb_null,
2245                                         async_reloadips_callback, NULL,
2246                                         &status) != 0) {
2247                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2248                 talloc_free(tmp_ctx);
2249                 return -1;
2250         }
2251
2252         if (status != 0) {
2253                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2254                 talloc_free(tmp_ctx);
2255                 return -1;
2256         }
2257
2258         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2259
2260         talloc_free(tmp_ctx);
2261         return 0;
2262 }
2263
2264
2265 /*
2266   handler for ip reallocate, just add it to the list of callers and 
2267   handle this later in the monitor_cluster loop so we do not recurse
2268   with other callers to takeover_run()
2269 */
2270 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2271                              TDB_DATA data, void *private_data)
2272 {
2273         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2274         struct ip_reallocate_list *caller;
2275
2276         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2277                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2278                 return;
2279         }
2280
2281         if (rec->ip_reallocate_ctx == NULL) {
2282                 rec->ip_reallocate_ctx = talloc_new(rec);
2283                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2284         }
2285
2286         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2287         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2288
2289         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2290         caller->next = rec->reallocate_callers;
2291         rec->reallocate_callers = caller;
2292
2293         return;
2294 }
2295
2296 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2297 {
2298         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2299         TDB_DATA result;
2300         int32_t ret;
2301         struct ip_reallocate_list *callers;
2302         uint32_t culprit;
2303
2304         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2305
2306         /* update the list of public ips that a node can handle for
2307            all connected nodes
2308         */
2309         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2310         if (ret != 0) {
2311                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2312                                  culprit));
2313                 rec->need_takeover_run = true;
2314         }
2315         if (ret == 0) {
2316                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2317                 if (ret != 0) {
2318                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2319                         rec->need_takeover_run = true;
2320                 }
2321         }
2322
2323         result.dsize = sizeof(int32_t);
2324         result.dptr  = (uint8_t *)&ret;
2325
2326         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2327
2328                 /* Someone that sent srvid==0 does not want a reply */
2329                 if (callers->rd->srvid == 0) {
2330                         continue;
2331                 }
2332                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2333                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2334                                   (unsigned long long)callers->rd->srvid));
2335                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2336                 if (ret != 0) {
2337                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2338                                          "message to %u:%llu\n",
2339                                          (unsigned)callers->rd->pnn,
2340                                          (unsigned long long)callers->rd->srvid));
2341                 }
2342         }
2343
2344         talloc_free(tmp_ctx);
2345         talloc_free(rec->ip_reallocate_ctx);
2346         rec->ip_reallocate_ctx = NULL;
2347         rec->reallocate_callers = NULL;
2348         
2349 }
2350
2351
2352 /*
2353   handler for recovery master elections
2354 */
2355 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2356                              TDB_DATA data, void *private_data)
2357 {
2358         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2359         int ret;
2360         struct election_message *em = (struct election_message *)data.dptr;
2361         TALLOC_CTX *mem_ctx;
2362
2363         /* we got an election packet - update the timeout for the election */
2364         talloc_free(rec->election_timeout);
2365         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2366                                                 fast_start ?
2367                                                 timeval_current_ofs(0, 500000) :
2368                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2369                                                 ctdb_election_timeout, rec);
2370
2371         mem_ctx = talloc_new(ctdb);
2372
2373         /* someone called an election. check their election data
2374            and if we disagree and we would rather be the elected node, 
2375            send a new election message to all other nodes
2376          */
2377         if (ctdb_election_win(rec, em)) {
2378                 if (!rec->send_election_te) {
2379                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2380                                                                 timeval_current_ofs(0, 500000),
2381                                                                 election_send_request, rec);
2382                 }
2383                 talloc_free(mem_ctx);
2384                 /*unban_all_nodes(ctdb);*/
2385                 return;
2386         }
2387         
2388         /* we didn't win */
2389         talloc_free(rec->send_election_te);
2390         rec->send_election_te = NULL;
2391
2392         if (ctdb->tunable.verify_recovery_lock != 0) {
2393                 /* release the recmaster lock */
2394                 if (em->pnn != ctdb->pnn &&
2395                     ctdb->recovery_lock_fd != -1) {
2396                         close(ctdb->recovery_lock_fd);
2397                         ctdb->recovery_lock_fd = -1;
2398                         unban_all_nodes(ctdb);
2399                 }
2400         }
2401
2402         /* ok, let that guy become recmaster then */
2403         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2404         if (ret != 0) {
2405                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2406                 talloc_free(mem_ctx);
2407                 return;
2408         }
2409
2410         talloc_free(mem_ctx);
2411         return;
2412 }
2413
2414
2415 /*
2416   force the start of the election process
2417  */
2418 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2419                            struct ctdb_node_map *nodemap)
2420 {
2421         int ret;
2422         struct ctdb_context *ctdb = rec->ctdb;
2423
2424         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2425
2426         /* set all nodes to recovery mode to stop all internode traffic */
2427         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2428         if (ret != 0) {
2429                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2430                 return;
2431         }
2432
2433         talloc_free(rec->election_timeout);
2434         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2435                                                 fast_start ?
2436                                                 timeval_current_ofs(0, 500000) :
2437                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2438                                                 ctdb_election_timeout, rec);
2439
2440         ret = send_election_request(rec, pnn, true);
2441         if (ret!=0) {
2442                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2443                 return;
2444         }
2445
2446         /* wait for a few seconds to collect all responses */
2447         ctdb_wait_election(rec);
2448 }
2449
2450
2451
2452 /*
2453   handler for when a node changes its flags
2454 */
2455 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2456                             TDB_DATA data, void *private_data)
2457 {
2458         int ret;
2459         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2460         struct ctdb_node_map *nodemap=NULL;
2461         TALLOC_CTX *tmp_ctx;
2462         int i;
2463         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2464         int disabled_flag_changed;
2465
2466         if (data.dsize != sizeof(*c)) {
2467                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2468                 return;
2469         }
2470
2471         tmp_ctx = talloc_new(ctdb);
2472         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2473
2474         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2475         if (ret != 0) {
2476                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2477                 talloc_free(tmp_ctx);
2478                 return;         
2479         }
2480
2481
2482         for (i=0;i<nodemap->num;i++) {
2483                 if (nodemap->nodes[i].pnn == c->pnn) break;
2484         }
2485
2486         if (i == nodemap->num) {
2487                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2488                 talloc_free(tmp_ctx);
2489                 return;
2490         }
2491
2492         if (nodemap->nodes[i].flags != c->new_flags) {
2493                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2494         }
2495
2496         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2497
2498         nodemap->nodes[i].flags = c->new_flags;
2499
2500         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2501                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2502
2503         if (ret == 0) {
2504                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2505                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2506         }
2507         
2508         if (ret == 0 &&
2509             ctdb->recovery_master == ctdb->pnn &&
2510             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2511                 /* Only do the takeover run if the perm disabled or unhealthy
2512                    flags changed since these will cause an ip failover but not
2513                    a recovery.
2514                    If the node became disconnected or banned this will also
2515                    lead to an ip address failover but that is handled 
2516                    during recovery
2517                 */
2518                 if (disabled_flag_changed) {
2519                         rec->need_takeover_run = true;
2520                 }
2521         }
2522
2523         talloc_free(tmp_ctx);
2524 }
2525
2526 /*
2527   handler for when we need to push out flag changes ot all other nodes
2528 */
2529 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2530                             TDB_DATA data, void *private_data)
2531 {
2532         int ret;
2533         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2534         struct ctdb_node_map *nodemap=NULL;
2535         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2536         uint32_t recmaster;
2537         uint32_t *nodes;
2538
2539         /* find the recovery master */
2540         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2541         if (ret != 0) {
2542                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2543                 talloc_free(tmp_ctx);
2544                 return;
2545         }
2546
2547         /* read the node flags from the recmaster */
2548         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2549         if (ret != 0) {
2550                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2551                 talloc_free(tmp_ctx);
2552                 return;
2553         }
2554         if (c->pnn >= nodemap->num) {
2555                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2556                 talloc_free(tmp_ctx);
2557                 return;
2558         }
2559
2560         /* send the flags update to all connected nodes */
2561         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2562
2563         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2564                                       nodes, 0, CONTROL_TIMEOUT(),
2565                                       false, data,
2566                                       NULL, NULL,
2567                                       NULL) != 0) {
2568                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2569
2570                 talloc_free(tmp_ctx);
2571                 return;
2572         }
2573
2574         talloc_free(tmp_ctx);
2575 }
2576
2577
2578 struct verify_recmode_normal_data {
2579         uint32_t count;
2580         enum monitor_result status;
2581 };
2582
2583 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2584 {
2585         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2586
2587
2588         /* one more node has responded with recmode data*/
2589         rmdata->count--;
2590
2591         /* if we failed to get the recmode, then return an error and let
2592            the main loop try again.
2593         */
2594         if (state->state != CTDB_CONTROL_DONE) {
2595                 if (rmdata->status == MONITOR_OK) {
2596                         rmdata->status = MONITOR_FAILED;
2597                 }
2598                 return;
2599         }
2600
2601         /* if we got a response, then the recmode will be stored in the
2602            status field
2603         */
2604         if (state->status != CTDB_RECOVERY_NORMAL) {
2605                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2606                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2607         }
2608
2609         return;
2610 }
2611
2612
2613 /* verify that all nodes are in normal recovery mode */
2614 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2615 {
2616         struct verify_recmode_normal_data *rmdata;
2617         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2618         struct ctdb_client_control_state *state;
2619         enum monitor_result status;
2620         int j;
2621         
2622         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2623         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2624         rmdata->count  = 0;
2625         rmdata->status = MONITOR_OK;
2626
2627         /* loop over all active nodes and send an async getrecmode call to 
2628            them*/
2629         for (j=0; j<nodemap->num; j++) {
2630                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2631                         continue;
2632                 }
2633                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2634                                         CONTROL_TIMEOUT(), 
2635                                         nodemap->nodes[j].pnn);
2636                 if (state == NULL) {
2637                         /* we failed to send the control, treat this as 
2638                            an error and try again next iteration
2639                         */                      
2640                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2641                         talloc_free(mem_ctx);
2642                         return MONITOR_FAILED;
2643                 }
2644
2645                 /* set up the callback functions */
2646                 state->async.fn = verify_recmode_normal_callback;
2647                 state->async.private_data = rmdata;
2648
2649                 /* one more control to wait for to complete */
2650                 rmdata->count++;
2651         }
2652
2653
2654         /* now wait for up to the maximum number of seconds allowed
2655            or until all nodes we expect a response from has replied
2656         */
2657         while (rmdata->count > 0) {
2658                 event_loop_once(ctdb->ev);
2659         }
2660
2661         status = rmdata->status;
2662         talloc_free(mem_ctx);
2663         return status;
2664 }
2665
2666
2667 struct verify_recmaster_data {
2668         struct ctdb_recoverd *rec;
2669         uint32_t count;
2670         uint32_t pnn;
2671         enum monitor_result status;
2672 };
2673
2674 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2675 {
2676         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2677
2678
2679         /* one more node has responded with recmaster data*/
2680         rmdata->count--;
2681
2682         /* if we failed to get the recmaster, then return an error and let
2683            the main loop try again.
2684         */
2685         if (state->state != CTDB_CONTROL_DONE) {
2686                 if (rmdata->status == MONITOR_OK) {
2687                         rmdata->status = MONITOR_FAILED;
2688                 }
2689                 return;
2690         }
2691
2692         /* if we got a response, then the recmaster will be stored in the
2693            status field
2694         */
2695         if (state->status != rmdata->pnn) {
2696                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2697                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2698                 rmdata->status = MONITOR_ELECTION_NEEDED;
2699         }
2700
2701         return;
2702 }
2703
2704
2705 /* verify that all nodes agree that we are the recmaster */
2706 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2707 {
2708         struct ctdb_context *ctdb = rec->ctdb;
2709         struct verify_recmaster_data *rmdata;
2710         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2711         struct ctdb_client_control_state *state;
2712         enum monitor_result status;
2713         int j;
2714         
2715         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2716         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2717         rmdata->rec    = rec;
2718         rmdata->count  = 0;
2719         rmdata->pnn    = pnn;
2720         rmdata->status = MONITOR_OK;
2721
2722         /* loop over all active nodes and send an async getrecmaster call to 
2723            them*/
2724         for (j=0; j<nodemap->num; j++) {
2725                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2726                         continue;
2727                 }
2728                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2729                                         CONTROL_TIMEOUT(),
2730                                         nodemap->nodes[j].pnn);
2731                 if (state == NULL) {
2732                         /* we failed to send the control, treat this as 
2733                            an error and try again next iteration
2734                         */                      
2735                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2736                         talloc_free(mem_ctx);
2737                         return MONITOR_FAILED;
2738                 }
2739
2740                 /* set up the callback functions */
2741                 state->async.fn = verify_recmaster_callback;
2742                 state->async.private_data = rmdata;
2743
2744                 /* one more control to wait for to complete */
2745                 rmdata->count++;
2746         }
2747
2748
2749         /* now wait for up to the maximum number of seconds allowed
2750            or until all nodes we expect a response from has replied
2751         */
2752         while (rmdata->count > 0) {
2753                 event_loop_once(ctdb->ev);
2754         }
2755
2756         status = rmdata->status;
2757         talloc_free(mem_ctx);
2758         return status;
2759 }
2760
2761
2762 /* called to check that the local allocation of public ip addresses is ok.
2763 */
2764 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2765 {
2766         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2767         struct ctdb_control_get_ifaces *ifaces = NULL;
2768         struct ctdb_all_public_ips *ips = NULL;
2769         struct ctdb_uptime *uptime1 = NULL;
2770         struct ctdb_uptime *uptime2 = NULL;
2771         int ret, j;
2772         bool need_iface_check = false;
2773         bool need_takeover_run = false;
2774
2775         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2776                                 CTDB_CURRENT_NODE, &uptime1);
2777         if (ret != 0) {
2778                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2779                 talloc_free(mem_ctx);
2780                 return -1;
2781         }
2782
2783
2784         /* read the interfaces from the local node */
2785         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2786         if (ret != 0) {
2787                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2788                 talloc_free(mem_ctx);
2789                 return -1;
2790         }
2791
2792         if (!rec->ifaces) {
2793                 need_iface_check = true;
2794         } else if (rec->ifaces->num != ifaces->num) {
2795                 need_iface_check = true;
2796         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2797                 need_iface_check = true;
2798         }
2799
2800         talloc_free(rec->ifaces);
2801         rec->ifaces = talloc_steal(rec, ifaces);
2802
2803         if (need_iface_check) {
2804                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2805                                      "local node %u - force takeover run\n",
2806                                      pnn));
2807                 need_takeover_run = true;
2808         }
2809
2810         /* read the ip allocation from the local node */
2811         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2812         if (ret != 0) {
2813                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2814                 talloc_free(mem_ctx);
2815                 return -1;
2816         }
2817
2818         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2819                                 CTDB_CURRENT_NODE, &uptime2);
2820         if (ret != 0) {
2821                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2822                 talloc_free(mem_ctx);
2823                 return -1;
2824         }
2825
2826         /* skip the check if the startrecovery time has changed */
2827         if (timeval_compare(&uptime1->last_recovery_started,
2828                             &uptime2->last_recovery_started) != 0) {
2829                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2830                 talloc_free(mem_ctx);
2831                 return 0;
2832         }
2833
2834         /* skip the check if the endrecovery time has changed */
2835         if (timeval_compare(&uptime1->last_recovery_finished,
2836                             &uptime2->last_recovery_finished) != 0) {
2837                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2838                 talloc_free(mem_ctx);
2839                 return 0;
2840         }
2841
2842         /* skip the check if we have started but not finished recovery */
2843         if (timeval_compare(&uptime1->last_recovery_finished,
2844                             &uptime1->last_recovery_started) != 1) {
2845                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2846                 talloc_free(mem_ctx);
2847
2848                 return 0;
2849         }
2850
2851         /* verify that we have the ip addresses we should have
2852            and we dont have ones we shouldnt have.
2853            if we find an inconsistency we set recmode to
2854            active on the local node and wait for the recmaster
2855            to do a full blown recovery.
2856            also if the pnn is -1 and we are healthy and can host the ip
2857            we also request a ip reallocation.
2858         */
2859         if (ctdb->tunable.disable_ip_failover == 0) {
2860                 for (j=0; j<ips->num; j++) {
2861                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2862                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2863                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2864                                 need_takeover_run = true;
2865                         } else if (ips->ips[j].pnn == pnn) {
2866                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
2867                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2868                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2869                                         need_takeover_run = true;
2870                                 }
2871                         } else {
2872                                 if (ctdb->do_checkpublicip && ctdb_sys_have_ip(&ips->ips[j].addr)) {
2873
2874                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving. Removing it.\n", 
2875                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2876
2877                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
2878                                                 DEBUG(DEBUG_ERR,("Failed to release local ip address\n"));
2879                                         }
2880                                 }
2881                         }
2882                 }
2883         }
2884
2885         if (need_takeover_run) {
2886                 struct takeover_run_reply rd;
2887                 TDB_DATA data;
2888
2889                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2890
2891                 rd.pnn = ctdb->pnn;
2892                 rd.srvid = 0;
2893                 data.dptr = (uint8_t *)&rd;
2894                 data.dsize = sizeof(rd);
2895
2896                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2897                 if (ret != 0) {
2898                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2899                 }
2900         }
2901         talloc_free(mem_ctx);
2902         return 0;
2903 }
2904
2905
2906 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2907 {
2908         struct ctdb_node_map **remote_nodemaps = callback_data;
2909
2910         if (node_pnn >= ctdb->num_nodes) {
2911                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2912                 return;
2913         }
2914
2915         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2916
2917 }
2918
2919 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2920         struct ctdb_node_map *nodemap,
2921         struct ctdb_node_map **remote_nodemaps)
2922 {
2923         uint32_t *nodes;
2924
2925         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2926         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2927                                         nodes, 0,
2928                                         CONTROL_TIMEOUT(), false, tdb_null,
2929                                         async_getnodemap_callback,
2930                                         NULL,
2931                                         remote_nodemaps) != 0) {
2932                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2933
2934                 return -1;
2935         }
2936
2937         return 0;
2938 }
2939
2940 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2941 struct ctdb_check_reclock_state {
2942         struct ctdb_context *ctdb;
2943         struct timeval start_time;
2944         int fd[2];
2945         pid_t child;
2946         struct timed_event *te;
2947         struct fd_event *fde;
2948         enum reclock_child_status status;
2949 };
2950
2951 /* when we free the reclock state we must kill any child process.
2952 */
2953 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2954 {
2955         struct ctdb_context *ctdb = state->ctdb;
2956
2957         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2958
2959         if (state->fd[0] != -1) {
2960                 close(state->fd[0]);
2961                 state->fd[0] = -1;
2962         }
2963         if (state->fd[1] != -1) {
2964                 close(state->fd[1]);
2965                 state->fd[1] = -1;
2966         }
2967         ctdb_kill(ctdb, state->child, SIGKILL);
2968         return 0;
2969 }
2970
2971 /*
2972   called if our check_reclock child times out. this would happen if
2973   i/o to the reclock file blocks.
2974  */
2975 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2976                                          struct timeval t, void *private_data)
2977 {
2978         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2979                                            struct ctdb_check_reclock_state);
2980
2981         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2982         state->status = RECLOCK_TIMEOUT;
2983 }
2984
2985 /* this is called when the child process has completed checking the reclock
2986    file and has written data back to us through the pipe.
2987 */
2988 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2989                              uint16_t flags, void *private_data)
2990 {
2991         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2992                                              struct ctdb_check_reclock_state);
2993         char c = 0;
2994         int ret;
2995
2996         /* we got a response from our child process so we can abort the
2997            timeout.
2998         */
2999         talloc_free(state->te);
3000         state->te = NULL;
3001
3002         ret = read(state->fd[0], &c, 1);
3003         if (ret != 1 || c != RECLOCK_OK) {
3004                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3005                 state->status = RECLOCK_FAILED;
3006
3007                 return;
3008         }
3009
3010         state->status = RECLOCK_OK;
3011         return;
3012 }
3013
3014 static int check_recovery_lock(struct ctdb_context *ctdb)
3015 {
3016         int ret;
3017         struct ctdb_check_reclock_state *state;
3018         pid_t parent = getpid();
3019
3020         if (ctdb->recovery_lock_fd == -1) {
3021                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3022                 return -1;
3023         }
3024
3025         state = talloc(ctdb, struct ctdb_check_reclock_state);
3026         CTDB_NO_MEMORY(ctdb, state);
3027
3028         state->ctdb = ctdb;
3029         state->start_time = timeval_current();
3030         state->status = RECLOCK_CHECKING;
3031         state->fd[0] = -1;
3032         state->fd[1] = -1;
3033
3034         ret = pipe(state->fd);
3035         if (ret != 0) {
3036                 talloc_free(state);
3037                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3038                 return -1;
3039         }
3040
3041         state->child = ctdb_fork(ctdb);
3042         if (state->child == (pid_t)-1) {
3043                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3044                 close(state->fd[0]);
3045                 state->fd[0] = -1;
3046                 close(state->fd[1]);
3047                 state->fd[1] = -1;
3048                 talloc_free(state);
3049                 return -1;
3050         }
3051
3052         if (state->child == 0) {
3053                 char cc = RECLOCK_OK;
3054                 close(state->fd[0]);
3055                 state->fd[0] = -1;
3056
3057                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3058                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3059                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3060                         cc = RECLOCK_FAILED;
3061                 }
3062
3063                 write(state->fd[1], &cc, 1);
3064                 /* make sure we die when our parent dies */
3065                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3066                         sleep(5);
3067                         write(state->fd[1], &cc, 1);
3068                 }
3069                 _exit(0);
3070         }
3071         close(state->fd[1]);
3072         state->fd[1] = -1;
3073         set_close_on_exec(state->fd[0]);
3074
3075         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3076
3077         talloc_set_destructor(state, check_reclock_destructor);
3078
3079         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3080                                     ctdb_check_reclock_timeout, state);
3081         if (state->te == NULL) {
3082                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3083                 talloc_free(state);
3084                 return -1;
3085         }
3086
3087         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3088                                 EVENT_FD_READ,
3089                                 reclock_child_handler,
3090                                 (void *)state);
3091
3092         if (state->fde == NULL) {
3093                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3094                 talloc_free(state);
3095                 return -1;
3096         }
3097         tevent_fd_set_auto_close(state->fde);
3098
3099         while (state->status == RECLOCK_CHECKING) {
3100                 event_loop_once(ctdb->ev);
3101         }
3102
3103         if (state->status == RECLOCK_FAILED) {
3104                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3105                 close(ctdb->recovery_lock_fd);
3106                 ctdb->recovery_lock_fd = -1;
3107                 talloc_free(state);
3108                 return -1;
3109         }
3110
3111         talloc_free(state);
3112         return 0;
3113 }
3114
3115 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3116 {
3117         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3118         const char *reclockfile;
3119
3120         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3121                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3122                 talloc_free(tmp_ctx);
3123                 return -1;      
3124         }
3125
3126         if (reclockfile == NULL) {
3127                 if (ctdb->recovery_lock_file != NULL) {
3128                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3129                         talloc_free(ctdb->recovery_lock_file);
3130                         ctdb->recovery_lock_file = NULL;
3131                         if (ctdb->recovery_lock_fd != -1) {
3132                                 close(ctdb->recovery_lock_fd);
3133                                 ctdb->recovery_lock_fd = -1;
3134                         }
3135                 }
3136                 ctdb->tunable.verify_recovery_lock = 0;
3137                 talloc_free(tmp_ctx);
3138                 return 0;
3139         }
3140
3141         if (ctdb->recovery_lock_file == NULL) {
3142                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3143                 if (ctdb->recovery_lock_fd != -1) {
3144                         close(ctdb->recovery_lock_fd);
3145                         ctdb->recovery_lock_fd = -1;
3146                 }
3147                 talloc_free(tmp_ctx);
3148                 return 0;
3149         }
3150
3151
3152         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3153                 talloc_free(tmp_ctx);
3154                 return 0;
3155         }
3156
3157         talloc_free(ctdb->recovery_lock_file);
3158         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3159         ctdb->tunable.verify_recovery_lock = 0;
3160         if (ctdb->recovery_lock_fd != -1) {
3161                 close(ctdb->recovery_lock_fd);
3162                 ctdb->recovery_lock_fd = -1;
3163         }
3164
3165         talloc_free(tmp_ctx);
3166         return 0;
3167 }
3168
3169 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3170                       TALLOC_CTX *mem_ctx)
3171 {
3172         uint32_t pnn;
3173         struct ctdb_node_map *nodemap=NULL;
3174         struct ctdb_node_map *recmaster_nodemap=NULL;
3175         struct ctdb_node_map **remote_nodemaps=NULL;
3176         struct ctdb_vnn_map *vnnmap=NULL;
3177         struct ctdb_vnn_map *remote_vnnmap=NULL;
3178         int32_t debug_level;
3179         int i, j, ret;
3180
3181
3182
3183         /* verify that the main daemon is still running */
3184         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3185                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3186                 exit(-1);
3187         }
3188
3189         /* ping the local daemon to tell it we are alive */
3190         ctdb_ctrl_recd_ping(ctdb);
3191
3192         if (rec->election_timeout) {
3193                 /* an election is in progress */
3194                 return;
3195         }
3196
3197         /* read the debug level from the parent and update locally */
3198         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3199         if (ret !=0) {
3200                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3201                 return;
3202         }
3203         LogLevel = debug_level;
3204
3205
3206         /* We must check if we need to ban a node here but we want to do this
3207            as early as possible so we dont wait until we have pulled the node
3208            map from the local node. thats why we have the hardcoded value 20
3209         */
3210         for (i=0; i<ctdb->num_nodes; i++) {
3211                 struct ctdb_banning_state *ban_state;
3212
3213                 if (ctdb->nodes[i]->ban_state == NULL) {
3214                         continue;
3215                 }
3216                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3217                 if (ban_state->count < 20) {
3218                         continue;
3219                 }
3220                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3221                         ctdb->nodes[i]->pnn, ban_state->count,
3222                         ctdb->tunable.recovery_ban_period));
3223                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3224                 ban_state->count = 0;
3225         }
3226
3227         /* get relevant tunables */
3228         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3229         if (ret != 0) {
3230                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3231                 return;
3232         }
3233
3234         /* get the current recovery lock file from the server */
3235         if (update_recovery_lock_file(ctdb) != 0) {
3236                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3237                 return;
3238         }
3239
3240         /* Make sure that if recovery lock verification becomes disabled when
3241            we close the file
3242         */
3243         if (ctdb->tunable.verify_recovery_lock == 0) {
3244                 if (ctdb->recovery_lock_fd != -1) {
3245                         close(ctdb->recovery_lock_fd);
3246                         ctdb->recovery_lock_fd = -1;
3247                 }
3248         }
3249
3250         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3251         if (pnn == (uint32_t)-1) {
3252                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3253                 return;
3254         }
3255
3256         /* get the vnnmap */
3257         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3258         if (ret != 0) {
3259                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3260                 return;
3261         }
3262
3263
3264         /* get number of nodes */
3265         if (rec->nodemap) {
3266                 talloc_free(rec->nodemap);
3267                 rec->nodemap = NULL;
3268                 nodemap=NULL;
3269         }
3270         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3271         if (ret != 0) {
3272                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3273                 return;
3274         }
3275         nodemap = rec->nodemap;
3276
3277         /* update the capabilities for all nodes */
3278         ret = update_capabilities(ctdb, nodemap);
3279         if (ret != 0) {
3280                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3281                 return;
3282         }
3283
3284         /* check which node is the recovery master */
3285         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3286         if (ret != 0) {
3287                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3288                 return;
3289         }
3290
3291         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3292         if (rec->recmaster != pnn) {
3293                 if (rec->ip_reallocate_ctx != NULL) {
3294                         talloc_free(rec->ip_reallocate_ctx);
3295                         rec->ip_reallocate_ctx = NULL;
3296                         rec->reallocate_callers = NULL;
3297                 }
3298         }
3299
3300         if (rec->recmaster == (uint32_t)-1) {
3301                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3302                 force_election(rec, pnn, nodemap);
3303                 return;
3304         }
3305
3306         /* if the local daemon is STOPPED, we verify that the databases are
3307            also frozen and thet the recmode is set to active 
3308         */
3309         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3310                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3311                 if (ret != 0) {
3312                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3313                 }
3314                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3315                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3316
3317                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3318                         if (ret != 0) {
3319                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3320                                 return;
3321                         }
3322                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3323                         if (ret != 0) {
3324                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3325
3326                                 return;
3327                         }
3328                         return;
3329                 }
3330         }
3331         /* If the local node is stopped, verify we are not the recmaster 
3332            and yield this role if so
3333         */
3334         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
3335                 DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
3336                 force_election(rec, pnn, nodemap);
3337                 return;
3338         }
3339         
3340         /*
3341          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3342          * but we have force an election and try to become the new
3343          * recmaster
3344          */
3345         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3346             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3347              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3348                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3349                                   " but we (node %u) have - force an election\n",
3350                                   rec->recmaster, pnn));
3351                 force_election(rec, pnn, nodemap);
3352                 return;
3353         }
3354
3355         /* check that we (recovery daemon) and the local ctdb daemon
3356            agrees on whether we are banned or not
3357         */
3358 //qqq
3359
3360         /* remember our own node flags */
3361         rec->node_flags = nodemap->nodes[pnn].flags;
3362
3363         /* count how many active nodes there are */
3364         rec->num_active    = 0;
3365         rec->num_connected = 0;
3366         for (i=0; i<nodemap->num; i++) {
3367                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3368                         rec->num_active++;
3369                 }
3370                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3371                         rec->num_connected++;
3372                 }
3373         }
3374
3375
3376         /* verify that the recmaster node is still active */
3377         for (j=0; j<nodemap->num; j++) {
3378                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3379                         break;
3380                 }
3381         }
3382
3383         if (j == nodemap->num) {
3384                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3385                 force_election(rec, pnn, nodemap);
3386                 return;
3387         }
3388
3389         /* if recovery master is disconnected we must elect a new recmaster */
3390         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3391                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3392                 force_election(rec, pnn, nodemap);
3393                 return;
3394         }
3395
3396         /* get nodemap from the recovery master to check if it is inactive */
3397         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3398                                    mem_ctx, &recmaster_nodemap);
3399         if (ret != 0) {
3400                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3401                           nodemap->nodes[j].pnn));
3402                 return;
3403         }
3404
3405
3406         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3407             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3408                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3409                 force_election(rec, pnn, nodemap);
3410                 return;
3411         }
3412
3413         /* If this node is stopped then it is not the recovery master
3414          * so the only remaining action is to potentially to verify
3415          * the local IP allocation below.  This won't accomplish
3416          * anything useful so skip it.
3417          */
3418         if (rec->node_flags & NODE_FLAGS_STOPPED) {
3419                 return;
3420         }
3421
3422         /* verify that we have all ip addresses we should have and we dont
3423          * have addresses we shouldnt have.
3424          */ 
3425         if (ctdb->tunable.disable_ip_failover == 0) {
3426                 if (rec->ip_check_disable_ctx == NULL) {
3427                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3428                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3429                         }
3430                 }
3431         }
3432
3433
3434         /* if we are not the recmaster then we do not need to check
3435            if recovery is needed
3436          */
3437         if (pnn != rec->recmaster) {
3438                 return;
3439         }
3440
3441
3442         /* ensure our local copies of flags are right */
3443         ret = update_local_flags(rec, nodemap);
3444         if (ret == MONITOR_ELECTION_NEEDED) {
3445                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3446                 force_election(rec, pnn, nodemap);
3447                 return;
3448         }
3449         if (ret != MONITOR_OK) {
3450                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3451                 return;
3452         }
3453
3454         if (ctdb->num_nodes != nodemap->num) {
3455                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3456                 reload_nodes_file(ctdb);
3457                 return;
3458         }
3459
3460         /* verify that all active nodes agree that we are the recmaster */
3461         switch (verify_recmaster(rec, nodemap, pnn)) {
3462         case MONITOR_RECOVERY_NEEDED:
3463                 /* can not happen */
3464                 return;
3465         case MONITOR_ELECTION_NEEDED:
3466                 force_election(rec, pnn, nodemap);
3467                 return;
3468         case MONITOR_OK:
3469                 break;
3470         case MONITOR_FAILED:
3471                 return;
3472         }
3473
3474
3475         if (rec->need_recovery) {
3476                 /* a previous recovery didn't finish */
3477                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3478                 return;
3479         }
3480
3481         /* verify that all active nodes are in normal mode 
3482            and not in recovery mode 
3483         */
3484         switch (verify_recmode(ctdb, nodemap)) {
3485         case MONITOR_RECOVERY_NEEDED:
3486                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3487                 return;
3488         case MONITOR_FAILED:
3489                 return;
3490         case MONITOR_ELECTION_NEEDED:
3491                 /* can not happen */
3492         case MONITOR_OK:
3493                 break;
3494         }
3495
3496
3497         if (ctdb->tunable.verify_recovery_lock != 0) {
3498                 /* we should have the reclock - check its not stale */
3499                 ret = check_recovery_lock(ctdb);
3500                 if (ret != 0) {
3501                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3502                         ctdb_set_culprit(rec, ctdb->pnn);
3503                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3504                         return;
3505                 }
3506         }
3507
3508
3509         /* is there a pending reload all ips ? */
3510         if (reload_all_ips_request != NULL) {
3511                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3512                 talloc_free(reload_all_ips_request);
3513                 reload_all_ips_request = NULL;
3514         }
3515
3516         /* if there are takeovers requested, perform it and notify the waiters */
3517         if (rec->reallocate_callers) {
3518                 process_ipreallocate_requests(ctdb, rec);
3519         }
3520
3521         /* get the nodemap for all active remote nodes
3522          */
3523         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3524         if (remote_nodemaps == NULL) {
3525                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3526                 return;
3527         }
3528         for(i=0; i<nodemap->num; i++) {
3529                 remote_nodemaps[i] = NULL;
3530         }
3531         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3532                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3533                 return;
3534         } 
3535
3536         /* verify that all other nodes have the same nodemap as we have
3537         */
3538         for (j=0; j<nodemap->num; j++) {
3539                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3540                         continue;
3541                 }
3542
3543                 if (remote_nodemaps[j] == NULL) {
3544                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3545                         ctdb_set_culprit(rec, j);
3546
3547                         return;
3548                 }
3549
3550                 /* if the nodes disagree on how many nodes there are
3551                    then this is a good reason to try recovery
3552                  */
3553                 if (remote_nodemaps[j]->num != nodemap->num) {
3554                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3555                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3556                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3557                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3558                         return;
3559                 }
3560
3561                 /* if the nodes disagree on which nodes exist and are
3562                    active, then that is also a good reason to do recovery
3563                  */
3564                 for (i=0;i<nodemap->num;i++) {
3565                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3566                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3567                                           nodemap->nodes[j].pnn, i, 
3568                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3569                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3570                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3571                                             vnnmap);
3572                                 return;
3573                         }
3574                 }
3575
3576                 /* verify the flags are consistent
3577                 */
3578                 for (i=0; i<nodemap->num; i++) {
3579                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3580                                 continue;
3581                         }
3582                         
3583                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3584                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3585                                   nodemap->nodes[j].pnn, 
3586                                   nodemap->nodes[i].pnn, 
3587                                   remote_nodemaps[j]->nodes[i].flags,
3588                                   nodemap->nodes[j].flags));
3589                                 if (i == j) {
3590                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3591                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3592                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3593                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3594                                                     vnnmap);
3595                                         return;
3596                                 } else {
3597                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3598                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3599                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3600                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3601                                                     vnnmap);
3602                                         return;
3603                                 }
3604                         }
3605                 }
3606         }
3607
3608
3609         /* there better be the same number of lmasters in the vnn map
3610            as there are active nodes or we will have to do a recovery
3611          */
3612         if (vnnmap->size != rec->num_active) {
3613                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3614                           vnnmap->size, rec->num_active));
3615                 ctdb_set_culprit(rec, ctdb->pnn);
3616                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3617                 return;
3618         }
3619
3620         /* verify that all active nodes in the nodemap also exist in 
3621            the vnnmap.
3622          */
3623         for (j=0; j<nodemap->num; j++) {
3624                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3625                         continue;
3626                 }
3627                 if (nodemap->nodes[j].pnn == pnn) {
3628                         continue;
3629                 }
3630
3631                 for (i=0; i<vnnmap->size; i++) {
3632                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3633                                 break;
3634                         }
3635                 }
3636                 if (i == vnnmap->size) {
3637                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3638                                   nodemap->nodes[j].pnn));
3639                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3640                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3641                         return;
3642                 }
3643         }
3644
3645         
3646         /* verify that all other nodes have the same vnnmap
3647            and are from the same generation
3648          */
3649         for (j=0; j<nodemap->num; j++) {
3650                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3651                         continue;
3652                 }
3653                 if (nodemap->nodes[j].pnn == pnn) {
3654                         continue;
3655                 }
3656
3657                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3658                                           mem_ctx, &remote_vnnmap);
3659                 if (ret != 0) {
3660                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3661                                   nodemap->nodes[j].pnn));
3662                         return;
3663                 }
3664
3665                 /* verify the vnnmap generation is the same */
3666                 if (vnnmap->generation != remote_vnnmap->generation) {
3667                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3668                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3669                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3670                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3671                         return;
3672                 }
3673
3674                 /* verify the vnnmap size is the same */
3675                 if (vnnmap->size != remote_vnnmap->size) {
3676                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3677                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3678                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3679                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3680                         return;
3681                 }
3682
3683                 /* verify the vnnmap is the same */
3684                 for (i=0;i<vnnmap->size;i++) {
3685                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3686                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3687                                           nodemap->nodes[j].pnn));
3688                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3689                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3690                                             vnnmap);
3691                                 return;
3692                         }
3693                 }
3694         }
3695
3696         /* we might need to change who has what IP assigned */
3697         if (rec->need_takeover_run) {
3698                 uint32_t culprit = (uint32_t)-1;
3699
3700                 rec->need_takeover_run = false;
3701
3702                 /* update the list of public ips that a node can handle for
3703                    all connected nodes
3704                 */
3705                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3706                 if (ret != 0) {
3707                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3708                                          culprit));
3709                         rec->need_takeover_run = true;
3710                         return;
3711                 }
3712
3713                 /* execute the "startrecovery" event script on all nodes */
3714                 ret = run_startrecovery_eventscript(rec, nodemap);
3715                 if (ret!=0) {
3716                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3717                         ctdb_set_culprit(rec, ctdb->pnn);
3718                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3719                         return;
3720                 }
3721
3722                 ret = ctdb_takeover_run(ctdb, nodemap);
3723                 if (ret != 0) {
3724                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3725                         return;
3726                 }
3727
3728                 /* execute the "recovered" event script on all nodes */
3729                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3730 #if 0
3731 // we cant check whether the event completed successfully
3732 // since this script WILL fail if the node is in recovery mode
3733 // and if that race happens, the code here would just cause a second
3734 // cascading recovery.
3735                 if (ret!=0) {
3736                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3737                         ctdb_set_culprit(rec, ctdb->pnn);
3738                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3739                 }
3740 #endif
3741         }
3742 }
3743
3744 /*
3745   the main monitoring loop
3746  */
3747 static void monitor_cluster(struct ctdb_context *ctdb)
3748 {
3749         struct ctdb_recoverd *rec;
3750
3751         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3752
3753         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3754         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3755
3756         rec->ctdb = ctdb;
3757
3758         rec->priority_time = timeval_current();
3759
3760         /* register a message port for sending memory dumps */
3761         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3762
3763         /* register a message port for recovery elections */
3764         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3765
3766         /* when nodes are disabled/enabled */
3767         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3768
3769         /* when we are asked to puch out a flag change */
3770         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3771
3772         /* register a message port for vacuum fetch */
3773         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3774
3775         /* register a message port for reloadnodes  */
3776         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3777
3778         /* register a message port for performing a takeover run */
3779         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3780
3781         /* register a message port for performing a reload all ips */
3782         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3783
3784         /* register a message port for disabling the ip check for a short while */
3785         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3786
3787         /* register a message port for updating the recovery daemons node assignment for an ip */
3788         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3789
3790         /* register a message port for forcing a rebalance of a node next
3791            reallocation */
3792         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3793
3794         for (;;) {
3795                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3796                 struct timeval start;
3797                 double elapsed;
3798
3799                 if (!mem_ctx) {
3800                         DEBUG(DEBUG_CRIT,(__location__
3801                                           " Failed to create temp context\n"));
3802                         exit(-1);
3803                 }
3804
3805                 start = timeval_current();
3806                 main_loop(ctdb, rec, mem_ctx);
3807                 talloc_free(mem_ctx);
3808
3809                 /* we only check for recovery once every second */
3810                 elapsed = timeval_elapsed(&start);
3811                 if (elapsed < ctdb->tunable.recover_interval) {
3812                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3813                                           - elapsed);
3814                 }
3815         }
3816 }
3817
3818 /*
3819   event handler for when the main ctdbd dies
3820  */
3821 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3822                                  uint16_t flags, void *private_data)
3823 {
3824         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3825         _exit(1);
3826 }
3827
3828 /*
3829   called regularly to verify that the recovery daemon is still running
3830  */
3831 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3832                               struct timeval yt, void *p)
3833 {
3834         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3835
3836         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3837                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3838
3839                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3840                                 ctdb_restart_recd, ctdb);
3841
3842                 return;
3843         }
3844
3845         event_add_timed(ctdb->ev, ctdb, 
3846                         timeval_current_ofs(30, 0),
3847                         ctdb_check_recd, ctdb);
3848 }
3849
3850 static void recd_sig_child_handler(struct event_context *ev,
3851         struct signal_event *se, int signum, int count,
3852         void *dont_care, 
3853         void *private_data)
3854 {
3855 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3856         int status;
3857         pid_t pid = -1;
3858
3859         while (pid != 0) {
3860                 pid = waitpid(-1, &status, WNOHANG);
3861                 if (pid == -1) {
3862                         if (errno != ECHILD) {
3863                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3864                         }
3865                         return;
3866                 }
3867                 if (pid > 0) {
3868                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3869                 }
3870         }
3871 }
3872
3873 /*
3874   startup the recovery daemon as a child of the main ctdb daemon
3875  */
3876 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3877 {
3878         int fd[2];
3879         struct signal_event *se;
3880         struct tevent_fd *fde;
3881
3882         if (pipe(fd) != 0) {
3883                 return -1;
3884         }
3885
3886         ctdb->ctdbd_pid = getpid();
3887
3888         ctdb->recoverd_pid = ctdb_fork(ctdb);
3889         if (ctdb->recoverd_pid == -1) {
3890                 return -1;
3891         }
3892         
3893         if (ctdb->recoverd_pid != 0) {
3894                 close(fd[0]);
3895                 event_add_timed(ctdb->ev, ctdb, 
3896                                 timeval_current_ofs(30, 0),
3897                                 ctdb_check_recd, ctdb);
3898                 return 0;
3899         }
3900
3901         close(fd[1]);
3902
3903         srandom(getpid() ^ time(NULL));
3904
3905         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3906                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3907                 exit(1);
3908         }
3909
3910         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3911
3912         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3913                      ctdb_recoverd_parent, &fd[0]);     
3914         tevent_fd_set_auto_close(fde);
3915
3916         /* set up a handler to pick up sigchld */
3917         se = event_add_signal(ctdb->ev, ctdb,
3918                                      SIGCHLD, 0,
3919                                      recd_sig_child_handler,
3920                                      ctdb);
3921         if (se == NULL) {
3922                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3923                 exit(1);
3924         }
3925
3926         monitor_cluster(ctdb);
3927
3928         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3929         return -1;
3930 }
3931
3932 /*
3933   shutdown the recovery daemon
3934  */
3935 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3936 {
3937         if (ctdb->recoverd_pid == 0) {
3938                 return;
3939         }
3940
3941         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3942         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3943 }
3944
3945 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3946                        struct timeval t, void *private_data)
3947 {
3948         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3949
3950         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3951         ctdb_stop_recoverd(ctdb);
3952         ctdb_start_recoverd(ctdb);
3953 }