recoverd: Verifying local IPs should only check for unhosted available IPs
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
90
91         if (!ctdb_validate_pnn(ctdb, pnn)) {
92                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
93                 return;
94         }
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   remember the trouble maker
112  */
113 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
114 {
115         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
116         struct ctdb_banning_state *ban_state;
117
118         if (culprit > ctdb->num_nodes) {
119                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
120                 return;
121         }
122
123         if (ctdb->nodes[culprit]->ban_state == NULL) {
124                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
125                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
126
127                 
128         }
129         ban_state = ctdb->nodes[culprit]->ban_state;
130         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
131                 /* this was the first time in a long while this node
132                    misbehaved so we will forgive any old transgressions.
133                 */
134                 ban_state->count = 0;
135         }
136
137         ban_state->count += count;
138         ban_state->last_reported_time = timeval_current();
139         rec->last_culprit_node = culprit;
140 }
141
142 /*
143   remember the trouble maker
144  */
145 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
146 {
147         ctdb_set_culprit_count(rec, culprit, 1);
148 }
149
150
151 /* this callback is called for every node that failed to execute the
152    recovered event
153 */
154 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
155 {
156         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
157
158         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
159
160         ctdb_set_culprit(rec, node_pnn);
161 }
162
163 /*
164   run the "recovered" eventscript on all nodes
165  */
166 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
167 {
168         TALLOC_CTX *tmp_ctx;
169         uint32_t *nodes;
170         struct ctdb_context *ctdb = rec->ctdb;
171
172         tmp_ctx = talloc_new(ctdb);
173         CTDB_NO_MEMORY(ctdb, tmp_ctx);
174
175         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
176         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
177                                         nodes, 0,
178                                         CONTROL_TIMEOUT(), false, tdb_null,
179                                         NULL, recovered_fail_callback,
180                                         rec) != 0) {
181                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
182
183                 talloc_free(tmp_ctx);
184                 return -1;
185         }
186
187         talloc_free(tmp_ctx);
188         return 0;
189 }
190
191 /* this callback is called for every node that failed to execute the
192    start recovery event
193 */
194 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
195 {
196         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
197
198         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
199
200         ctdb_set_culprit(rec, node_pnn);
201 }
202
203 /*
204   run the "startrecovery" eventscript on all nodes
205  */
206 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
207 {
208         TALLOC_CTX *tmp_ctx;
209         uint32_t *nodes;
210         struct ctdb_context *ctdb = rec->ctdb;
211
212         tmp_ctx = talloc_new(ctdb);
213         CTDB_NO_MEMORY(ctdb, tmp_ctx);
214
215         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
216         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
217                                         nodes, 0,
218                                         CONTROL_TIMEOUT(), false, tdb_null,
219                                         NULL,
220                                         startrecovery_fail_callback,
221                                         rec) != 0) {
222                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
223                 talloc_free(tmp_ctx);
224                 return -1;
225         }
226
227         talloc_free(tmp_ctx);
228         return 0;
229 }
230
231 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
232 {
233         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
234                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
235                 return;
236         }
237         if (node_pnn < ctdb->num_nodes) {
238                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
239         }
240
241         if (node_pnn == ctdb->pnn) {
242                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
243         }
244 }
245
246 /*
247   update the node capabilities for all connected nodes
248  */
249 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
250 {
251         uint32_t *nodes;
252         TALLOC_CTX *tmp_ctx;
253
254         tmp_ctx = talloc_new(ctdb);
255         CTDB_NO_MEMORY(ctdb, tmp_ctx);
256
257         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
258         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
259                                         nodes, 0,
260                                         CONTROL_TIMEOUT(),
261                                         false, tdb_null,
262                                         async_getcap_callback, NULL,
263                                         NULL) != 0) {
264                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
265                 talloc_free(tmp_ctx);
266                 return -1;
267         }
268
269         talloc_free(tmp_ctx);
270         return 0;
271 }
272
273 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
274 {
275         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
276
277         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
278         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
279 }
280
281 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
282 {
283         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
284
285         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
286         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
287 }
288
289 /*
290   change recovery mode on all nodes
291  */
292 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
293 {
294         TDB_DATA data;
295         uint32_t *nodes;
296         TALLOC_CTX *tmp_ctx;
297
298         tmp_ctx = talloc_new(ctdb);
299         CTDB_NO_MEMORY(ctdb, tmp_ctx);
300
301         /* freeze all nodes */
302         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
303         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
304                 int i;
305
306                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
307                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
308                                                 nodes, i,
309                                                 CONTROL_TIMEOUT(),
310                                                 false, tdb_null,
311                                                 NULL,
312                                                 set_recmode_fail_callback,
313                                                 rec) != 0) {
314                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
315                                 talloc_free(tmp_ctx);
316                                 return -1;
317                         }
318                 }
319         }
320
321
322         data.dsize = sizeof(uint32_t);
323         data.dptr = (unsigned char *)&rec_mode;
324
325         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
326                                         nodes, 0,
327                                         CONTROL_TIMEOUT(),
328                                         false, data,
329                                         NULL, NULL,
330                                         NULL) != 0) {
331                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
332                 talloc_free(tmp_ctx);
333                 return -1;
334         }
335
336         talloc_free(tmp_ctx);
337         return 0;
338 }
339
340 /*
341   change recovery master on all node
342  */
343 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
344 {
345         TDB_DATA data;
346         TALLOC_CTX *tmp_ctx;
347         uint32_t *nodes;
348
349         tmp_ctx = talloc_new(ctdb);
350         CTDB_NO_MEMORY(ctdb, tmp_ctx);
351
352         data.dsize = sizeof(uint32_t);
353         data.dptr = (unsigned char *)&pnn;
354
355         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
356         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
357                                         nodes, 0,
358                                         CONTROL_TIMEOUT(), false, data,
359                                         NULL, NULL,
360                                         NULL) != 0) {
361                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
362                 talloc_free(tmp_ctx);
363                 return -1;
364         }
365
366         talloc_free(tmp_ctx);
367         return 0;
368 }
369
370 /* update all remote nodes to use the same db priority that we have
371    this can fail if the remove node has not yet been upgraded to 
372    support this function, so we always return success and never fail
373    a recovery if this call fails.
374 */
375 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
376         struct ctdb_node_map *nodemap, 
377         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
378 {
379         int db;
380         uint32_t *nodes;
381
382         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
383
384         /* step through all local databases */
385         for (db=0; db<dbmap->num;db++) {
386                 TDB_DATA data;
387                 struct ctdb_db_priority db_prio;
388                 int ret;
389
390                 db_prio.db_id     = dbmap->dbs[db].dbid;
391                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
392                 if (ret != 0) {
393                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
394                         continue;
395                 }
396
397                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
398
399                 data.dptr  = (uint8_t *)&db_prio;
400                 data.dsize = sizeof(db_prio);
401
402                 if (ctdb_client_async_control(ctdb,
403                                         CTDB_CONTROL_SET_DB_PRIORITY,
404                                         nodes, 0,
405                                         CONTROL_TIMEOUT(), false, data,
406                                         NULL, NULL,
407                                         NULL) != 0) {
408                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
409                 }
410         }
411
412         return 0;
413 }                       
414
415 /*
416   ensure all other nodes have attached to any databases that we have
417  */
418 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
419                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
420 {
421         int i, j, db, ret;
422         struct ctdb_dbid_map *remote_dbmap;
423
424         /* verify that all other nodes have all our databases */
425         for (j=0; j<nodemap->num; j++) {
426                 /* we dont need to ourself ourselves */
427                 if (nodemap->nodes[j].pnn == pnn) {
428                         continue;
429                 }
430                 /* dont check nodes that are unavailable */
431                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
432                         continue;
433                 }
434
435                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
436                                          mem_ctx, &remote_dbmap);
437                 if (ret != 0) {
438                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
439                         return -1;
440                 }
441
442                 /* step through all local databases */
443                 for (db=0; db<dbmap->num;db++) {
444                         const char *name;
445
446
447                         for (i=0;i<remote_dbmap->num;i++) {
448                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
449                                         break;
450                                 }
451                         }
452                         /* the remote node already have this database */
453                         if (i!=remote_dbmap->num) {
454                                 continue;
455                         }
456                         /* ok so we need to create this database */
457                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
458                                             mem_ctx, &name);
459                         if (ret != 0) {
460                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
461                                 return -1;
462                         }
463                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
464                                            mem_ctx, name,
465                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
466                         if (ret != 0) {
467                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
468                                 return -1;
469                         }
470                 }
471         }
472
473         return 0;
474 }
475
476
477 /*
478   ensure we are attached to any databases that anyone else is attached to
479  */
480 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
481                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
482 {
483         int i, j, db, ret;
484         struct ctdb_dbid_map *remote_dbmap;
485
486         /* verify that we have all database any other node has */
487         for (j=0; j<nodemap->num; j++) {
488                 /* we dont need to ourself ourselves */
489                 if (nodemap->nodes[j].pnn == pnn) {
490                         continue;
491                 }
492                 /* dont check nodes that are unavailable */
493                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
494                         continue;
495                 }
496
497                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
498                                          mem_ctx, &remote_dbmap);
499                 if (ret != 0) {
500                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
501                         return -1;
502                 }
503
504                 /* step through all databases on the remote node */
505                 for (db=0; db<remote_dbmap->num;db++) {
506                         const char *name;
507
508                         for (i=0;i<(*dbmap)->num;i++) {
509                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
510                                         break;
511                                 }
512                         }
513                         /* we already have this db locally */
514                         if (i!=(*dbmap)->num) {
515                                 continue;
516                         }
517                         /* ok so we need to create this database and
518                            rebuild dbmap
519                          */
520                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
521                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
522                         if (ret != 0) {
523                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
524                                           nodemap->nodes[j].pnn));
525                                 return -1;
526                         }
527                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
528                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
529                         if (ret != 0) {
530                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
531                                 return -1;
532                         }
533                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
534                         if (ret != 0) {
535                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
536                                 return -1;
537                         }
538                 }
539         }
540
541         return 0;
542 }
543
544
545 /*
546   pull the remote database contents from one node into the recdb
547  */
548 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
549                                     struct tdb_wrap *recdb, uint32_t dbid)
550 {
551         int ret;
552         TDB_DATA outdata;
553         struct ctdb_marshall_buffer *reply;
554         struct ctdb_rec_data *rec;
555         int i;
556         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
557
558         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
559                                CONTROL_TIMEOUT(), &outdata);
560         if (ret != 0) {
561                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
562                 talloc_free(tmp_ctx);
563                 return -1;
564         }
565
566         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
567
568         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
569                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
570                 talloc_free(tmp_ctx);
571                 return -1;
572         }
573         
574         rec = (struct ctdb_rec_data *)&reply->data[0];
575         
576         for (i=0;
577              i<reply->count;
578              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
579                 TDB_DATA key, data;
580                 struct ctdb_ltdb_header *hdr;
581                 TDB_DATA existing;
582                 
583                 key.dptr = &rec->data[0];
584                 key.dsize = rec->keylen;
585                 data.dptr = &rec->data[key.dsize];
586                 data.dsize = rec->datalen;
587                 
588                 hdr = (struct ctdb_ltdb_header *)data.dptr;
589
590                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
591                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
592                         talloc_free(tmp_ctx);
593                         return -1;
594                 }
595
596                 /* fetch the existing record, if any */
597                 existing = tdb_fetch(recdb->tdb, key);
598                 
599                 if (existing.dptr != NULL) {
600                         struct ctdb_ltdb_header header;
601                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
602                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
603                                          (unsigned)existing.dsize, srcnode));
604                                 free(existing.dptr);
605                                 talloc_free(tmp_ctx);
606                                 return -1;
607                         }
608                         header = *(struct ctdb_ltdb_header *)existing.dptr;
609                         free(existing.dptr);
610                         if (!(header.rsn < hdr->rsn ||
611                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
612                                 continue;
613                         }
614                 }
615                 
616                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
617                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
618                         talloc_free(tmp_ctx);
619                         return -1;                              
620                 }
621         }
622
623         talloc_free(tmp_ctx);
624
625         return 0;
626 }
627
628
629 struct pull_seqnum_cbdata {
630         int failed;
631         uint32_t pnn;
632         uint64_t seqnum;
633 };
634
635 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
636 {
637         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
638         uint64_t seqnum;
639
640         if (cb_data->failed != 0) {
641                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
642                 return;
643         }
644
645         if (res != 0) {
646                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
647                 cb_data->failed = 1;
648                 return;
649         }
650
651         if (outdata.dsize != sizeof(uint64_t)) {
652                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
653                 cb_data->failed = -1;
654                 return;
655         }
656
657         seqnum = *((uint64_t *)outdata.dptr);
658
659         if (seqnum > cb_data->seqnum) {
660                 cb_data->seqnum = seqnum;
661                 cb_data->pnn = node_pnn;
662         }
663 }
664
665 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
666 {
667         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
668
669         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
670         cb_data->failed = 1;
671 }
672
673 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
674                                 struct ctdb_recoverd *rec, 
675                                 struct ctdb_node_map *nodemap, 
676                                 struct tdb_wrap *recdb, uint32_t dbid)
677 {
678         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
679         uint32_t *nodes;
680         TDB_DATA data;
681         uint32_t outdata[2];
682         struct pull_seqnum_cbdata *cb_data;
683
684         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
685
686         outdata[0] = dbid;
687         outdata[1] = 0;
688
689         data.dsize = sizeof(outdata);
690         data.dptr  = (uint8_t *)&outdata[0];
691
692         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
693         if (cb_data == NULL) {
694                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
695                 talloc_free(tmp_ctx);
696                 return -1;
697         }
698
699         cb_data->failed = 0;
700         cb_data->pnn    = -1;
701         cb_data->seqnum = 0;
702         
703         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
704         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
705                                         nodes, 0,
706                                         CONTROL_TIMEOUT(), false, data,
707                                         pull_seqnum_cb,
708                                         pull_seqnum_fail_cb,
709                                         cb_data) != 0) {
710                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
711
712                 talloc_free(tmp_ctx);
713                 return -1;
714         }
715
716         if (cb_data->failed != 0) {
717                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
718                 talloc_free(tmp_ctx);
719                 return -1;
720         }
721
722         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
723                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
729
730         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
731                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
732                 talloc_free(tmp_ctx);
733                 return -1;
734         }
735
736         talloc_free(tmp_ctx);
737         return 0;
738 }
739
740
741 /*
742   pull all the remote database contents into the recdb
743  */
744 static int pull_remote_database(struct ctdb_context *ctdb,
745                                 struct ctdb_recoverd *rec, 
746                                 struct ctdb_node_map *nodemap, 
747                                 struct tdb_wrap *recdb, uint32_t dbid,
748                                 bool persistent)
749 {
750         int j;
751
752         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
753                 int ret;
754                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
755                 if (ret == 0) {
756                         return 0;
757                 }
758         }
759
760         /* pull all records from all other nodes across onto this node
761            (this merges based on rsn)
762         */
763         for (j=0; j<nodemap->num; j++) {
764                 /* dont merge from nodes that are unavailable */
765                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
766                         continue;
767                 }
768                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
769                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
770                                  nodemap->nodes[j].pnn));
771                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
772                         return -1;
773                 }
774         }
775         
776         return 0;
777 }
778
779
780 /*
781   update flags on all active nodes
782  */
783 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
784 {
785         int ret;
786
787         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
788                 if (ret != 0) {
789                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
790                 return -1;
791         }
792
793         return 0;
794 }
795
796 /*
797   ensure all nodes have the same vnnmap we do
798  */
799 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
800                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
801 {
802         int j, ret;
803
804         /* push the new vnn map out to all the nodes */
805         for (j=0; j<nodemap->num; j++) {
806                 /* dont push to nodes that are unavailable */
807                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
808                         continue;
809                 }
810
811                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
812                 if (ret != 0) {
813                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
814                         return -1;
815                 }
816         }
817
818         return 0;
819 }
820
821
822 struct vacuum_info {
823         struct vacuum_info *next, *prev;
824         struct ctdb_recoverd *rec;
825         uint32_t srcnode;
826         struct ctdb_db_context *ctdb_db;
827         struct ctdb_marshall_buffer *recs;
828         struct ctdb_rec_data *r;
829 };
830
831 static void vacuum_fetch_next(struct vacuum_info *v);
832
833 /*
834   called when a vacuum fetch has completed - just free it and do the next one
835  */
836 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
837 {
838         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
839         talloc_free(state);
840         vacuum_fetch_next(v);
841 }
842
843
844 /*
845   process the next element from the vacuum list
846 */
847 static void vacuum_fetch_next(struct vacuum_info *v)
848 {
849         struct ctdb_call call;
850         struct ctdb_rec_data *r;
851
852         while (v->recs->count) {
853                 struct ctdb_client_call_state *state;
854                 TDB_DATA data;
855                 struct ctdb_ltdb_header *hdr;
856
857                 ZERO_STRUCT(call);
858                 call.call_id = CTDB_NULL_FUNC;
859                 call.flags = CTDB_IMMEDIATE_MIGRATION;
860                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
861
862                 r = v->r;
863                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
864                 v->recs->count--;
865
866                 call.key.dptr = &r->data[0];
867                 call.key.dsize = r->keylen;
868
869                 /* ensure we don't block this daemon - just skip a record if we can't get
870                    the chainlock */
871                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
872                         continue;
873                 }
874
875                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
876                 if (data.dptr == NULL) {
877                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
878                         continue;
879                 }
880
881                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
882                         free(data.dptr);
883                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
884                         continue;
885                 }
886                 
887                 hdr = (struct ctdb_ltdb_header *)data.dptr;
888                 if (hdr->dmaster == v->rec->ctdb->pnn) {
889                         /* its already local */
890                         free(data.dptr);
891                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
892                         continue;
893                 }
894
895                 free(data.dptr);
896
897                 state = ctdb_call_send(v->ctdb_db, &call);
898                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
899                 if (state == NULL) {
900                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
901                         talloc_free(v);
902                         return;
903                 }
904                 state->async.fn = vacuum_fetch_callback;
905                 state->async.private_data = v;
906                 return;
907         }
908
909         talloc_free(v);
910 }
911
912
913 /*
914   destroy a vacuum info structure
915  */
916 static int vacuum_info_destructor(struct vacuum_info *v)
917 {
918         DLIST_REMOVE(v->rec->vacuum_info, v);
919         return 0;
920 }
921
922
923 /*
924   handler for vacuum fetch
925 */
926 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
927                                  TDB_DATA data, void *private_data)
928 {
929         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
930         struct ctdb_marshall_buffer *recs;
931         int ret, i;
932         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
933         const char *name;
934         struct ctdb_dbid_map *dbmap=NULL;
935         bool persistent = false;
936         struct ctdb_db_context *ctdb_db;
937         struct ctdb_rec_data *r;
938         uint32_t srcnode;
939         struct vacuum_info *v;
940
941         recs = (struct ctdb_marshall_buffer *)data.dptr;
942         r = (struct ctdb_rec_data *)&recs->data[0];
943
944         if (recs->count == 0) {
945                 talloc_free(tmp_ctx);
946                 return;
947         }
948
949         srcnode = r->reqid;
950
951         for (v=rec->vacuum_info;v;v=v->next) {
952                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
953                         /* we're already working on records from this node */
954                         talloc_free(tmp_ctx);
955                         return;
956                 }
957         }
958
959         /* work out if the database is persistent */
960         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
961         if (ret != 0) {
962                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
963                 talloc_free(tmp_ctx);
964                 return;
965         }
966
967         for (i=0;i<dbmap->num;i++) {
968                 if (dbmap->dbs[i].dbid == recs->db_id) {
969                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
970                         break;
971                 }
972         }
973         if (i == dbmap->num) {
974                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
975                 talloc_free(tmp_ctx);
976                 return;         
977         }
978
979         /* find the name of this database */
980         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
981                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
982                 talloc_free(tmp_ctx);
983                 return;
984         }
985
986         /* attach to it */
987         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
988         if (ctdb_db == NULL) {
989                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
990                 talloc_free(tmp_ctx);
991                 return;
992         }
993
994         v = talloc_zero(rec, struct vacuum_info);
995         if (v == NULL) {
996                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
997                 talloc_free(tmp_ctx);
998                 return;
999         }
1000
1001         v->rec = rec;
1002         v->srcnode = srcnode;
1003         v->ctdb_db = ctdb_db;
1004         v->recs = talloc_memdup(v, recs, data.dsize);
1005         if (v->recs == NULL) {
1006                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1007                 talloc_free(v);
1008                 talloc_free(tmp_ctx);
1009                 return;         
1010         }
1011         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1012
1013         DLIST_ADD(rec->vacuum_info, v);
1014
1015         talloc_set_destructor(v, vacuum_info_destructor);
1016
1017         vacuum_fetch_next(v);
1018         talloc_free(tmp_ctx);
1019 }
1020
1021
1022 /*
1023   called when ctdb_wait_timeout should finish
1024  */
1025 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1026                               struct timeval yt, void *p)
1027 {
1028         uint32_t *timed_out = (uint32_t *)p;
1029         (*timed_out) = 1;
1030 }
1031
1032 /*
1033   wait for a given number of seconds
1034  */
1035 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1036 {
1037         uint32_t timed_out = 0;
1038         time_t usecs = (secs - (time_t)secs) * 1000000;
1039         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1040         while (!timed_out) {
1041                 event_loop_once(ctdb->ev);
1042         }
1043 }
1044
1045 /*
1046   called when an election times out (ends)
1047  */
1048 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1049                                   struct timeval t, void *p)
1050 {
1051         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1052         rec->election_timeout = NULL;
1053         fast_start = false;
1054
1055         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1056 }
1057
1058
1059 /*
1060   wait for an election to finish. It finished election_timeout seconds after
1061   the last election packet is received
1062  */
1063 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1064 {
1065         struct ctdb_context *ctdb = rec->ctdb;
1066         while (rec->election_timeout) {
1067                 event_loop_once(ctdb->ev);
1068         }
1069 }
1070
1071 /*
1072   Update our local flags from all remote connected nodes. 
1073   This is only run when we are or we belive we are the recovery master
1074  */
1075 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1076 {
1077         int j;
1078         struct ctdb_context *ctdb = rec->ctdb;
1079         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1080
1081         /* get the nodemap for all active remote nodes and verify
1082            they are the same as for this node
1083          */
1084         for (j=0; j<nodemap->num; j++) {
1085                 struct ctdb_node_map *remote_nodemap=NULL;
1086                 int ret;
1087
1088                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1089                         continue;
1090                 }
1091                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1092                         continue;
1093                 }
1094
1095                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1096                                            mem_ctx, &remote_nodemap);
1097                 if (ret != 0) {
1098                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1099                                   nodemap->nodes[j].pnn));
1100                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1101                         talloc_free(mem_ctx);
1102                         return MONITOR_FAILED;
1103                 }
1104                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1105                         /* We should tell our daemon about this so it
1106                            updates its flags or else we will log the same 
1107                            message again in the next iteration of recovery.
1108                            Since we are the recovery master we can just as
1109                            well update the flags on all nodes.
1110                         */
1111                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1112                         if (ret != 0) {
1113                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1114                                 return -1;
1115                         }
1116
1117                         /* Update our local copy of the flags in the recovery
1118                            daemon.
1119                         */
1120                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1121                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1122                                  nodemap->nodes[j].flags));
1123                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1124                 }
1125                 talloc_free(remote_nodemap);
1126         }
1127         talloc_free(mem_ctx);
1128         return MONITOR_OK;
1129 }
1130
1131
1132 /* Create a new random generation ip. 
1133    The generation id can not be the INVALID_GENERATION id
1134 */
1135 static uint32_t new_generation(void)
1136 {
1137         uint32_t generation;
1138
1139         while (1) {
1140                 generation = random();
1141
1142                 if (generation != INVALID_GENERATION) {
1143                         break;
1144                 }
1145         }
1146
1147         return generation;
1148 }
1149
1150
1151 /*
1152   create a temporary working database
1153  */
1154 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1155 {
1156         char *name;
1157         struct tdb_wrap *recdb;
1158         unsigned tdb_flags;
1159
1160         /* open up the temporary recovery database */
1161         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1162                                ctdb->db_directory_state,
1163                                ctdb->pnn);
1164         if (name == NULL) {
1165                 return NULL;
1166         }
1167         unlink(name);
1168
1169         tdb_flags = TDB_NOLOCK;
1170         if (ctdb->valgrinding) {
1171                 tdb_flags |= TDB_NOMMAP;
1172         }
1173         tdb_flags |= TDB_DISALLOW_NESTING;
1174
1175         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1176                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1177         if (recdb == NULL) {
1178                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1179         }
1180
1181         talloc_free(name);
1182
1183         return recdb;
1184 }
1185
1186
1187 /* 
1188    a traverse function for pulling all relevent records from recdb
1189  */
1190 struct recdb_data {
1191         struct ctdb_context *ctdb;
1192         struct ctdb_marshall_buffer *recdata;
1193         uint32_t len;
1194         uint32_t allocated_len;
1195         bool failed;
1196         bool persistent;
1197 };
1198
1199 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1200 {
1201         struct recdb_data *params = (struct recdb_data *)p;
1202         struct ctdb_rec_data *rec;
1203         struct ctdb_ltdb_header *hdr;
1204
1205         /* skip empty records */
1206         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1207                 return 0;
1208         }
1209
1210         /* update the dmaster field to point to us */
1211         hdr = (struct ctdb_ltdb_header *)data.dptr;
1212         if (!params->persistent) {
1213                 hdr->dmaster = params->ctdb->pnn;
1214                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1215         }
1216
1217         /* add the record to the blob ready to send to the nodes */
1218         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1219         if (rec == NULL) {
1220                 params->failed = true;
1221                 return -1;
1222         }
1223         if (params->len + rec->length >= params->allocated_len) {
1224                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1225                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1226         }
1227         if (params->recdata == NULL) {
1228                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1229                          rec->length + params->len, params->recdata->count));
1230                 params->failed = true;
1231                 return -1;
1232         }
1233         params->recdata->count++;
1234         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1235         params->len += rec->length;
1236         talloc_free(rec);
1237
1238         return 0;
1239 }
1240
1241 /*
1242   push the recdb database out to all nodes
1243  */
1244 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1245                                bool persistent,
1246                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1247 {
1248         struct recdb_data params;
1249         struct ctdb_marshall_buffer *recdata;
1250         TDB_DATA outdata;
1251         TALLOC_CTX *tmp_ctx;
1252         uint32_t *nodes;
1253
1254         tmp_ctx = talloc_new(ctdb);
1255         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1256
1257         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1258         CTDB_NO_MEMORY(ctdb, recdata);
1259
1260         recdata->db_id = dbid;
1261
1262         params.ctdb = ctdb;
1263         params.recdata = recdata;
1264         params.len = offsetof(struct ctdb_marshall_buffer, data);
1265         params.allocated_len = params.len;
1266         params.failed = false;
1267         params.persistent = persistent;
1268
1269         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1270                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1271                 talloc_free(params.recdata);
1272                 talloc_free(tmp_ctx);
1273                 return -1;
1274         }
1275
1276         if (params.failed) {
1277                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1278                 talloc_free(params.recdata);
1279                 talloc_free(tmp_ctx);
1280                 return -1;              
1281         }
1282
1283         recdata = params.recdata;
1284
1285         outdata.dptr = (void *)recdata;
1286         outdata.dsize = params.len;
1287
1288         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1289         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1290                                         nodes, 0,
1291                                         CONTROL_TIMEOUT(), false, outdata,
1292                                         NULL, NULL,
1293                                         NULL) != 0) {
1294                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1295                 talloc_free(recdata);
1296                 talloc_free(tmp_ctx);
1297                 return -1;
1298         }
1299
1300         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1301                   dbid, recdata->count));
1302
1303         talloc_free(recdata);
1304         talloc_free(tmp_ctx);
1305
1306         return 0;
1307 }
1308
1309
1310 /*
1311   go through a full recovery on one database 
1312  */
1313 static int recover_database(struct ctdb_recoverd *rec, 
1314                             TALLOC_CTX *mem_ctx,
1315                             uint32_t dbid,
1316                             bool persistent,
1317                             uint32_t pnn, 
1318                             struct ctdb_node_map *nodemap,
1319                             uint32_t transaction_id)
1320 {
1321         struct tdb_wrap *recdb;
1322         int ret;
1323         struct ctdb_context *ctdb = rec->ctdb;
1324         TDB_DATA data;
1325         struct ctdb_control_wipe_database w;
1326         uint32_t *nodes;
1327
1328         recdb = create_recdb(ctdb, mem_ctx);
1329         if (recdb == NULL) {
1330                 return -1;
1331         }
1332
1333         /* pull all remote databases onto the recdb */
1334         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1335         if (ret != 0) {
1336                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1337                 return -1;
1338         }
1339
1340         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1341
1342         /* wipe all the remote databases. This is safe as we are in a transaction */
1343         w.db_id = dbid;
1344         w.transaction_id = transaction_id;
1345
1346         data.dptr = (void *)&w;
1347         data.dsize = sizeof(w);
1348
1349         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1350         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1351                                         nodes, 0,
1352                                         CONTROL_TIMEOUT(), false, data,
1353                                         NULL, NULL,
1354                                         NULL) != 0) {
1355                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1356                 talloc_free(recdb);
1357                 return -1;
1358         }
1359         
1360         /* push out the correct database. This sets the dmaster and skips 
1361            the empty records */
1362         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1363         if (ret != 0) {
1364                 talloc_free(recdb);
1365                 return -1;
1366         }
1367
1368         /* all done with this database */
1369         talloc_free(recdb);
1370
1371         return 0;
1372 }
1373
1374 /*
1375   reload the nodes file 
1376 */
1377 static void reload_nodes_file(struct ctdb_context *ctdb)
1378 {
1379         ctdb->nodes = NULL;
1380         ctdb_load_nodes_file(ctdb);
1381 }
1382
1383 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1384                                          struct ctdb_recoverd *rec,
1385                                          struct ctdb_node_map *nodemap,
1386                                          uint32_t *culprit)
1387 {
1388         int j;
1389         int ret;
1390
1391         if (ctdb->num_nodes != nodemap->num) {
1392                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1393                                   ctdb->num_nodes, nodemap->num));
1394                 if (culprit) {
1395                         *culprit = ctdb->pnn;
1396                 }
1397                 return -1;
1398         }
1399
1400         for (j=0; j<nodemap->num; j++) {
1401                 /* release any existing data */
1402                 if (ctdb->nodes[j]->known_public_ips) {
1403                         talloc_free(ctdb->nodes[j]->known_public_ips);
1404                         ctdb->nodes[j]->known_public_ips = NULL;
1405                 }
1406                 if (ctdb->nodes[j]->available_public_ips) {
1407                         talloc_free(ctdb->nodes[j]->available_public_ips);
1408                         ctdb->nodes[j]->available_public_ips = NULL;
1409                 }
1410
1411                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1412                         continue;
1413                 }
1414
1415                 /* grab a new shiny list of public ips from the node */
1416                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1417                                         CONTROL_TIMEOUT(),
1418                                         ctdb->nodes[j]->pnn,
1419                                         ctdb->nodes,
1420                                         0,
1421                                         &ctdb->nodes[j]->known_public_ips);
1422                 if (ret != 0) {
1423                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1424                                 ctdb->nodes[j]->pnn));
1425                         if (culprit) {
1426                                 *culprit = ctdb->nodes[j]->pnn;
1427                         }
1428                         return -1;
1429                 }
1430
1431                 if (ctdb->do_checkpublicip) {
1432                         if (rec->ip_check_disable_ctx == NULL) {
1433                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1434                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1435                                         rec->need_takeover_run = true;
1436                                 }
1437                         }
1438                 }
1439
1440                 /* grab a new shiny list of public ips from the node */
1441                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1442                                         CONTROL_TIMEOUT(),
1443                                         ctdb->nodes[j]->pnn,
1444                                         ctdb->nodes,
1445                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1446                                         &ctdb->nodes[j]->available_public_ips);
1447                 if (ret != 0) {
1448                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1449                                 ctdb->nodes[j]->pnn));
1450                         if (culprit) {
1451                                 *culprit = ctdb->nodes[j]->pnn;
1452                         }
1453                         return -1;
1454                 }
1455         }
1456
1457         return 0;
1458 }
1459
1460 /* when we start a recovery, make sure all nodes use the same reclock file
1461    setting
1462 */
1463 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1464 {
1465         struct ctdb_context *ctdb = rec->ctdb;
1466         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1467         TDB_DATA data;
1468         uint32_t *nodes;
1469
1470         if (ctdb->recovery_lock_file == NULL) {
1471                 data.dptr  = NULL;
1472                 data.dsize = 0;
1473         } else {
1474                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1475                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1476         }
1477
1478         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1479         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1480                                         nodes, 0,
1481                                         CONTROL_TIMEOUT(),
1482                                         false, data,
1483                                         NULL, NULL,
1484                                         rec) != 0) {
1485                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1486                 talloc_free(tmp_ctx);
1487                 return -1;
1488         }
1489
1490         talloc_free(tmp_ctx);
1491         return 0;
1492 }
1493
1494
1495 /*
1496   we are the recmaster, and recovery is needed - start a recovery run
1497  */
1498 static int do_recovery(struct ctdb_recoverd *rec, 
1499                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1500                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1501 {
1502         struct ctdb_context *ctdb = rec->ctdb;
1503         int i, j, ret;
1504         uint32_t generation;
1505         struct ctdb_dbid_map *dbmap;
1506         TDB_DATA data;
1507         uint32_t *nodes;
1508         struct timeval start_time;
1509         uint32_t culprit = (uint32_t)-1;
1510
1511         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1512
1513         /* if recovery fails, force it again */
1514         rec->need_recovery = true;
1515
1516         for (i=0; i<ctdb->num_nodes; i++) {
1517                 struct ctdb_banning_state *ban_state;
1518
1519                 if (ctdb->nodes[i]->ban_state == NULL) {
1520                         continue;
1521                 }
1522                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1523                 if (ban_state->count < 2*ctdb->num_nodes) {
1524                         continue;
1525                 }
1526                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1527                         ctdb->nodes[i]->pnn, ban_state->count,
1528                         ctdb->tunable.recovery_ban_period));
1529                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1530                 ban_state->count = 0;
1531         }
1532
1533
1534         if (ctdb->tunable.verify_recovery_lock != 0) {
1535                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1536                 start_time = timeval_current();
1537                 if (!ctdb_recovery_lock(ctdb, true)) {
1538                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1539                                          "and ban ourself for %u seconds\n",
1540                                          ctdb->tunable.recovery_ban_period));
1541                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1542                         return -1;
1543                 }
1544                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1545                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1546         }
1547
1548         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1549
1550         /* get a list of all databases */
1551         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1552         if (ret != 0) {
1553                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1554                 return -1;
1555         }
1556
1557         /* we do the db creation before we set the recovery mode, so the freeze happens
1558            on all databases we will be dealing with. */
1559
1560         /* verify that we have all the databases any other node has */
1561         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1562         if (ret != 0) {
1563                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1564                 return -1;
1565         }
1566
1567         /* verify that all other nodes have all our databases */
1568         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1569         if (ret != 0) {
1570                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1571                 return -1;
1572         }
1573         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1574
1575         /* update the database priority for all remote databases */
1576         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1577         if (ret != 0) {
1578                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1579         }
1580         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1581
1582
1583         /* update all other nodes to use the same setting for reclock files
1584            as the local recovery master.
1585         */
1586         sync_recovery_lock_file_across_cluster(rec);
1587
1588         /* set recovery mode to active on all nodes */
1589         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1590         if (ret != 0) {
1591                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1592                 return -1;
1593         }
1594
1595         /* execute the "startrecovery" event script on all nodes */
1596         ret = run_startrecovery_eventscript(rec, nodemap);
1597         if (ret!=0) {
1598                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1599                 return -1;
1600         }
1601
1602         /*
1603           update all nodes to have the same flags that we have
1604          */
1605         for (i=0;i<nodemap->num;i++) {
1606                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1607                         continue;
1608                 }
1609
1610                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1611                 if (ret != 0) {
1612                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1613                         return -1;
1614                 }
1615         }
1616
1617         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1618
1619         /* pick a new generation number */
1620         generation = new_generation();
1621
1622         /* change the vnnmap on this node to use the new generation 
1623            number but not on any other nodes.
1624            this guarantees that if we abort the recovery prematurely
1625            for some reason (a node stops responding?)
1626            that we can just return immediately and we will reenter
1627            recovery shortly again.
1628            I.e. we deliberately leave the cluster with an inconsistent
1629            generation id to allow us to abort recovery at any stage and
1630            just restart it from scratch.
1631          */
1632         vnnmap->generation = generation;
1633         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1634         if (ret != 0) {
1635                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1636                 return -1;
1637         }
1638
1639         data.dptr = (void *)&generation;
1640         data.dsize = sizeof(uint32_t);
1641
1642         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1643         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1644                                         nodes, 0,
1645                                         CONTROL_TIMEOUT(), false, data,
1646                                         NULL,
1647                                         transaction_start_fail_callback,
1648                                         rec) != 0) {
1649                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1650                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1651                                         nodes, 0,
1652                                         CONTROL_TIMEOUT(), false, tdb_null,
1653                                         NULL,
1654                                         NULL,
1655                                         NULL) != 0) {
1656                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1657                 }
1658                 return -1;
1659         }
1660
1661         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1662
1663         for (i=0;i<dbmap->num;i++) {
1664                 ret = recover_database(rec, mem_ctx,
1665                                        dbmap->dbs[i].dbid,
1666                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1667                                        pnn, nodemap, generation);
1668                 if (ret != 0) {
1669                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1670                         return -1;
1671                 }
1672         }
1673
1674         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1675
1676         /* commit all the changes */
1677         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1678                                         nodes, 0,
1679                                         CONTROL_TIMEOUT(), false, data,
1680                                         NULL, NULL,
1681                                         NULL) != 0) {
1682                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1683                 return -1;
1684         }
1685
1686         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1687         
1688
1689         /* update the capabilities for all nodes */
1690         ret = update_capabilities(ctdb, nodemap);
1691         if (ret!=0) {
1692                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1693                 return -1;
1694         }
1695
1696         /* build a new vnn map with all the currently active and
1697            unbanned nodes */
1698         generation = new_generation();
1699         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1700         CTDB_NO_MEMORY(ctdb, vnnmap);
1701         vnnmap->generation = generation;
1702         vnnmap->size = 0;
1703         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1704         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1705         for (i=j=0;i<nodemap->num;i++) {
1706                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1707                         continue;
1708                 }
1709                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1710                         /* this node can not be an lmaster */
1711                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1712                         continue;
1713                 }
1714
1715                 vnnmap->size++;
1716                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1717                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1718                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1719
1720         }
1721         if (vnnmap->size == 0) {
1722                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1723                 vnnmap->size++;
1724                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1725                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1726                 vnnmap->map[0] = pnn;
1727         }       
1728
1729         /* update to the new vnnmap on all nodes */
1730         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1731         if (ret != 0) {
1732                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1733                 return -1;
1734         }
1735
1736         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1737
1738         /* update recmaster to point to us for all nodes */
1739         ret = set_recovery_master(ctdb, nodemap, pnn);
1740         if (ret!=0) {
1741                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1742                 return -1;
1743         }
1744
1745         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1746
1747         /*
1748           update all nodes to have the same flags that we have
1749          */
1750         for (i=0;i<nodemap->num;i++) {
1751                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1752                         continue;
1753                 }
1754
1755                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1756                 if (ret != 0) {
1757                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1758                         return -1;
1759                 }
1760         }
1761
1762         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1763
1764         /* disable recovery mode */
1765         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1766         if (ret != 0) {
1767                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1768                 return -1;
1769         }
1770
1771         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1772
1773         /*
1774           tell nodes to takeover their public IPs
1775          */
1776         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1777         if (ret != 0) {
1778                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1779                                  culprit));
1780                 rec->need_takeover_run = true;
1781                 return -1;
1782         }
1783         rec->need_takeover_run = false;
1784         ret = ctdb_takeover_run(ctdb, nodemap);
1785         if (ret != 0) {
1786                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1787                 rec->need_takeover_run = true;
1788         }
1789
1790         /* execute the "recovered" event script on all nodes */
1791         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
1792         if (ret!=0) {
1793                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1794                 return -1;
1795         }
1796
1797         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1798
1799         /* send a message to all clients telling them that the cluster 
1800            has been reconfigured */
1801         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1802
1803         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1804
1805         rec->need_recovery = false;
1806
1807         /* we managed to complete a full recovery, make sure to forgive
1808            any past sins by the nodes that could now participate in the
1809            recovery.
1810         */
1811         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1812         for (i=0;i<nodemap->num;i++) {
1813                 struct ctdb_banning_state *ban_state;
1814
1815                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1816                         continue;
1817                 }
1818
1819                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1820                 if (ban_state == NULL) {
1821                         continue;
1822                 }
1823
1824                 ban_state->count = 0;
1825         }
1826
1827
1828         /* We just finished a recovery successfully. 
1829            We now wait for rerecovery_timeout before we allow 
1830            another recovery to take place.
1831         */
1832         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1833         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1834         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1835
1836         return 0;
1837 }
1838
1839
1840 /*
1841   elections are won by first checking the number of connected nodes, then
1842   the priority time, then the pnn
1843  */
1844 struct election_message {
1845         uint32_t num_connected;
1846         struct timeval priority_time;
1847         uint32_t pnn;
1848         uint32_t node_flags;
1849 };
1850
1851 /*
1852   form this nodes election data
1853  */
1854 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1855 {
1856         int ret, i;
1857         struct ctdb_node_map *nodemap;
1858         struct ctdb_context *ctdb = rec->ctdb;
1859
1860         ZERO_STRUCTP(em);
1861
1862         em->pnn = rec->ctdb->pnn;
1863         em->priority_time = rec->priority_time;
1864
1865         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1866         if (ret != 0) {
1867                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1868                 return;
1869         }
1870
1871         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1872         em->node_flags = rec->node_flags;
1873
1874         for (i=0;i<nodemap->num;i++) {
1875                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1876                         em->num_connected++;
1877                 }
1878         }
1879
1880         /* we shouldnt try to win this election if we cant be a recmaster */
1881         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1882                 em->num_connected = 0;
1883                 em->priority_time = timeval_current();
1884         }
1885
1886         talloc_free(nodemap);
1887 }
1888
1889 /*
1890   see if the given election data wins
1891  */
1892 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1893 {
1894         struct election_message myem;
1895         int cmp = 0;
1896
1897         ctdb_election_data(rec, &myem);
1898
1899         /* we cant win if we dont have the recmaster capability */
1900         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1901                 return false;
1902         }
1903
1904         /* we cant win if we are banned */
1905         if (rec->node_flags & NODE_FLAGS_BANNED) {
1906                 return false;
1907         }       
1908
1909         /* we cant win if we are stopped */
1910         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1911                 return false;
1912         }       
1913
1914         /* we will automatically win if the other node is banned */
1915         if (em->node_flags & NODE_FLAGS_BANNED) {
1916                 return true;
1917         }
1918
1919         /* we will automatically win if the other node is banned */
1920         if (em->node_flags & NODE_FLAGS_STOPPED) {
1921                 return true;
1922         }
1923
1924         /* try to use the most connected node */
1925         if (cmp == 0) {
1926                 cmp = (int)myem.num_connected - (int)em->num_connected;
1927         }
1928
1929         /* then the longest running node */
1930         if (cmp == 0) {
1931                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1932         }
1933
1934         if (cmp == 0) {
1935                 cmp = (int)myem.pnn - (int)em->pnn;
1936         }
1937
1938         return cmp > 0;
1939 }
1940
1941 /*
1942   send out an election request
1943  */
1944 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1945 {
1946         int ret;
1947         TDB_DATA election_data;
1948         struct election_message emsg;
1949         uint64_t srvid;
1950         struct ctdb_context *ctdb = rec->ctdb;
1951
1952         srvid = CTDB_SRVID_RECOVERY;
1953
1954         ctdb_election_data(rec, &emsg);
1955
1956         election_data.dsize = sizeof(struct election_message);
1957         election_data.dptr  = (unsigned char *)&emsg;
1958
1959
1960         /* send an election message to all active nodes */
1961         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1962         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1963
1964
1965         /* A new node that is already frozen has entered the cluster.
1966            The existing nodes are not frozen and dont need to be frozen
1967            until the election has ended and we start the actual recovery
1968         */
1969         if (update_recmaster == true) {
1970                 /* first we assume we will win the election and set 
1971                    recoverymaster to be ourself on the current node
1972                  */
1973                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1974                 if (ret != 0) {
1975                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1976                         return -1;
1977                 }
1978         }
1979
1980
1981         return 0;
1982 }
1983
1984 /*
1985   this function will unban all nodes in the cluster
1986 */
1987 static void unban_all_nodes(struct ctdb_context *ctdb)
1988 {
1989         int ret, i;
1990         struct ctdb_node_map *nodemap;
1991         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1992         
1993         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1994         if (ret != 0) {
1995                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1996                 return;
1997         }
1998
1999         for (i=0;i<nodemap->num;i++) {
2000                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2001                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2002                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
2003                 }
2004         }
2005
2006         talloc_free(tmp_ctx);
2007 }
2008
2009
2010 /*
2011   we think we are winning the election - send a broadcast election request
2012  */
2013 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2014 {
2015         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2016         int ret;
2017
2018         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2019         if (ret != 0) {
2020                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2021         }
2022
2023         talloc_free(rec->send_election_te);
2024         rec->send_election_te = NULL;
2025 }
2026
2027 /*
2028   handler for memory dumps
2029 */
2030 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2031                              TDB_DATA data, void *private_data)
2032 {
2033         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2034         TDB_DATA *dump;
2035         int ret;
2036         struct rd_memdump_reply *rd;
2037
2038         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2039                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2040                 talloc_free(tmp_ctx);
2041                 return;
2042         }
2043         rd = (struct rd_memdump_reply *)data.dptr;
2044
2045         dump = talloc_zero(tmp_ctx, TDB_DATA);
2046         if (dump == NULL) {
2047                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2048                 talloc_free(tmp_ctx);
2049                 return;
2050         }
2051         ret = ctdb_dump_memory(ctdb, dump);
2052         if (ret != 0) {
2053                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2054                 talloc_free(tmp_ctx);
2055                 return;
2056         }
2057
2058 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2059
2060         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2061         if (ret != 0) {
2062                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2063                 talloc_free(tmp_ctx);
2064                 return;
2065         }
2066
2067         talloc_free(tmp_ctx);
2068 }
2069
2070 /*
2071   handler for reload_nodes
2072 */
2073 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2074                              TDB_DATA data, void *private_data)
2075 {
2076         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2077
2078         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2079
2080         reload_nodes_file(rec->ctdb);
2081 }
2082
2083
2084 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2085                               struct timeval yt, void *p)
2086 {
2087         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2088
2089         talloc_free(rec->ip_check_disable_ctx);
2090         rec->ip_check_disable_ctx = NULL;
2091 }
2092
2093
2094 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2095                                   struct timeval t, void *p)
2096 {
2097         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2098         struct ctdb_context *ctdb = rec->ctdb;
2099         int ret;
2100
2101         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2102
2103         ret = ctdb_takeover_run(ctdb, rec->nodemap);
2104         if (ret != 0) {
2105                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2106                 rec->need_takeover_run = true;
2107         }
2108
2109         talloc_free(rec->deferred_rebalance_ctx);
2110         rec->deferred_rebalance_ctx = NULL;
2111 }
2112
2113         
2114 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2115                              TDB_DATA data, void *private_data)
2116 {
2117         uint32_t pnn;
2118         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2119
2120         if (data.dsize != sizeof(uint32_t)) {
2121                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2122                 return;
2123         }
2124
2125         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2126                 return;
2127         }
2128
2129         pnn = *(uint32_t *)&data.dptr[0];
2130
2131         lcp2_forcerebalance(ctdb, pnn);
2132         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2133
2134         if (rec->deferred_rebalance_ctx != NULL) {
2135                 talloc_free(rec->deferred_rebalance_ctx);
2136         }
2137         rec->deferred_rebalance_ctx = talloc_new(rec);
2138         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2139                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2140                         ctdb_rebalance_timeout, rec);
2141 }
2142
2143
2144
2145 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2146                              TDB_DATA data, void *private_data)
2147 {
2148         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2149         struct ctdb_public_ip *ip;
2150
2151         if (rec->recmaster != rec->ctdb->pnn) {
2152                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2153                 return;
2154         }
2155
2156         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2157                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2158                 return;
2159         }
2160
2161         ip = (struct ctdb_public_ip *)data.dptr;
2162
2163         update_ip_assignment_tree(rec->ctdb, ip);
2164 }
2165
2166
2167 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2168                              TDB_DATA data, void *private_data)
2169 {
2170         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2171         uint32_t timeout;
2172
2173         if (rec->ip_check_disable_ctx != NULL) {
2174                 talloc_free(rec->ip_check_disable_ctx);
2175                 rec->ip_check_disable_ctx = NULL;
2176         }
2177
2178         if (data.dsize != sizeof(uint32_t)) {
2179                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2180                                  "expexting %lu\n", (long unsigned)data.dsize,
2181                                  (long unsigned)sizeof(uint32_t)));
2182                 return;
2183         }
2184         if (data.dptr == NULL) {
2185                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2186                 return;
2187         }
2188
2189         timeout = *((uint32_t *)data.dptr);
2190
2191         if (timeout == 0) {
2192                 DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
2193                 return;
2194         }
2195                 
2196         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2197
2198         rec->ip_check_disable_ctx = talloc_new(rec);
2199         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2200
2201         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2202 }
2203
2204
2205 /*
2206   handler for reload all ips.
2207 */
2208 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2209                              TDB_DATA data, void *private_data)
2210 {
2211         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2212
2213         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2214                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2215                 return;
2216         }
2217
2218         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2219
2220         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2221         return;
2222 }
2223
2224 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2225 {
2226         uint32_t *status = callback_data;
2227
2228         if (res != 0) {
2229                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2230                 *status = 1;
2231         }
2232 }
2233
2234 static int
2235 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2236 {
2237         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2238         uint32_t *nodes;
2239         uint32_t status;
2240         int i;
2241
2242         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2243         for (i = 0; i< nodemap->num; i++) {
2244                 if (nodemap->nodes[i].flags != 0) {
2245                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2246                         talloc_free(tmp_ctx);
2247                         return -1;
2248                 }
2249         }
2250
2251         /* send the flags update to all connected nodes */
2252         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2253         status = 0;
2254         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2255                                         nodes, 0,
2256                                         CONTROL_TIMEOUT(),
2257                                         false, tdb_null,
2258                                         async_reloadips_callback, NULL,
2259                                         &status) != 0) {
2260                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2261                 talloc_free(tmp_ctx);
2262                 return -1;
2263         }
2264
2265         if (status != 0) {
2266                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2267                 talloc_free(tmp_ctx);
2268                 return -1;
2269         }
2270
2271         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2272
2273         talloc_free(tmp_ctx);
2274         return 0;
2275 }
2276
2277
2278 /*
2279   handler for ip reallocate, just add it to the list of callers and 
2280   handle this later in the monitor_cluster loop so we do not recurse
2281   with other callers to takeover_run()
2282 */
2283 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2284                              TDB_DATA data, void *private_data)
2285 {
2286         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2287         struct ip_reallocate_list *caller;
2288
2289         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2290                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2291                 return;
2292         }
2293
2294         if (rec->ip_reallocate_ctx == NULL) {
2295                 rec->ip_reallocate_ctx = talloc_new(rec);
2296                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2297         }
2298
2299         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2300         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2301
2302         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2303         caller->next = rec->reallocate_callers;
2304         rec->reallocate_callers = caller;
2305
2306         return;
2307 }
2308
2309 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2310 {
2311         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2312         TDB_DATA result;
2313         int32_t ret;
2314         struct ip_reallocate_list *callers;
2315         uint32_t culprit;
2316
2317         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2318
2319         /* update the list of public ips that a node can handle for
2320            all connected nodes
2321         */
2322         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2323         if (ret != 0) {
2324                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2325                                  culprit));
2326                 rec->need_takeover_run = true;
2327         }
2328         if (ret == 0) {
2329                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2330                 if (ret != 0) {
2331                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2332                         rec->need_takeover_run = true;
2333                 }
2334         }
2335
2336         result.dsize = sizeof(int32_t);
2337         result.dptr  = (uint8_t *)&ret;
2338
2339         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2340
2341                 /* Someone that sent srvid==0 does not want a reply */
2342                 if (callers->rd->srvid == 0) {
2343                         continue;
2344                 }
2345                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2346                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2347                                   (unsigned long long)callers->rd->srvid));
2348                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2349                 if (ret != 0) {
2350                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2351                                          "message to %u:%llu\n",
2352                                          (unsigned)callers->rd->pnn,
2353                                          (unsigned long long)callers->rd->srvid));
2354                 }
2355         }
2356
2357         talloc_free(tmp_ctx);
2358         talloc_free(rec->ip_reallocate_ctx);
2359         rec->ip_reallocate_ctx = NULL;
2360         rec->reallocate_callers = NULL;
2361         
2362 }
2363
2364
2365 /*
2366   handler for recovery master elections
2367 */
2368 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2369                              TDB_DATA data, void *private_data)
2370 {
2371         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2372         int ret;
2373         struct election_message *em = (struct election_message *)data.dptr;
2374         TALLOC_CTX *mem_ctx;
2375
2376         /* we got an election packet - update the timeout for the election */
2377         talloc_free(rec->election_timeout);
2378         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2379                                                 fast_start ?
2380                                                 timeval_current_ofs(0, 500000) :
2381                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2382                                                 ctdb_election_timeout, rec);
2383
2384         mem_ctx = talloc_new(ctdb);
2385
2386         /* someone called an election. check their election data
2387            and if we disagree and we would rather be the elected node, 
2388            send a new election message to all other nodes
2389          */
2390         if (ctdb_election_win(rec, em)) {
2391                 if (!rec->send_election_te) {
2392                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2393                                                                 timeval_current_ofs(0, 500000),
2394                                                                 election_send_request, rec);
2395                 }
2396                 talloc_free(mem_ctx);
2397                 /*unban_all_nodes(ctdb);*/
2398                 return;
2399         }
2400         
2401         /* we didn't win */
2402         talloc_free(rec->send_election_te);
2403         rec->send_election_te = NULL;
2404
2405         if (ctdb->tunable.verify_recovery_lock != 0) {
2406                 /* release the recmaster lock */
2407                 if (em->pnn != ctdb->pnn &&
2408                     ctdb->recovery_lock_fd != -1) {
2409                         close(ctdb->recovery_lock_fd);
2410                         ctdb->recovery_lock_fd = -1;
2411                         unban_all_nodes(ctdb);
2412                 }
2413         }
2414
2415         /* ok, let that guy become recmaster then */
2416         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2417         if (ret != 0) {
2418                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2419                 talloc_free(mem_ctx);
2420                 return;
2421         }
2422
2423         talloc_free(mem_ctx);
2424         return;
2425 }
2426
2427
2428 /*
2429   force the start of the election process
2430  */
2431 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2432                            struct ctdb_node_map *nodemap)
2433 {
2434         int ret;
2435         struct ctdb_context *ctdb = rec->ctdb;
2436
2437         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2438
2439         /* set all nodes to recovery mode to stop all internode traffic */
2440         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2441         if (ret != 0) {
2442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2443                 return;
2444         }
2445
2446         talloc_free(rec->election_timeout);
2447         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2448                                                 fast_start ?
2449                                                 timeval_current_ofs(0, 500000) :
2450                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2451                                                 ctdb_election_timeout, rec);
2452
2453         ret = send_election_request(rec, pnn, true);
2454         if (ret!=0) {
2455                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2456                 return;
2457         }
2458
2459         /* wait for a few seconds to collect all responses */
2460         ctdb_wait_election(rec);
2461 }
2462
2463
2464
2465 /*
2466   handler for when a node changes its flags
2467 */
2468 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2469                             TDB_DATA data, void *private_data)
2470 {
2471         int ret;
2472         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2473         struct ctdb_node_map *nodemap=NULL;
2474         TALLOC_CTX *tmp_ctx;
2475         int i;
2476         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2477         int disabled_flag_changed;
2478
2479         if (data.dsize != sizeof(*c)) {
2480                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2481                 return;
2482         }
2483
2484         tmp_ctx = talloc_new(ctdb);
2485         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2486
2487         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2488         if (ret != 0) {
2489                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2490                 talloc_free(tmp_ctx);
2491                 return;         
2492         }
2493
2494
2495         for (i=0;i<nodemap->num;i++) {
2496                 if (nodemap->nodes[i].pnn == c->pnn) break;
2497         }
2498
2499         if (i == nodemap->num) {
2500                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2501                 talloc_free(tmp_ctx);
2502                 return;
2503         }
2504
2505         if (nodemap->nodes[i].flags != c->new_flags) {
2506                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2507         }
2508
2509         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2510
2511         nodemap->nodes[i].flags = c->new_flags;
2512
2513         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2514                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2515
2516         if (ret == 0) {
2517                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2518                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2519         }
2520         
2521         if (ret == 0 &&
2522             ctdb->recovery_master == ctdb->pnn &&
2523             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2524                 /* Only do the takeover run if the perm disabled or unhealthy
2525                    flags changed since these will cause an ip failover but not
2526                    a recovery.
2527                    If the node became disconnected or banned this will also
2528                    lead to an ip address failover but that is handled 
2529                    during recovery
2530                 */
2531                 if (disabled_flag_changed) {
2532                         rec->need_takeover_run = true;
2533                 }
2534         }
2535
2536         talloc_free(tmp_ctx);
2537 }
2538
2539 /*
2540   handler for when we need to push out flag changes ot all other nodes
2541 */
2542 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2543                             TDB_DATA data, void *private_data)
2544 {
2545         int ret;
2546         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2547         struct ctdb_node_map *nodemap=NULL;
2548         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2549         uint32_t recmaster;
2550         uint32_t *nodes;
2551
2552         /* find the recovery master */
2553         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2554         if (ret != 0) {
2555                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2556                 talloc_free(tmp_ctx);
2557                 return;
2558         }
2559
2560         /* read the node flags from the recmaster */
2561         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2562         if (ret != 0) {
2563                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2564                 talloc_free(tmp_ctx);
2565                 return;
2566         }
2567         if (c->pnn >= nodemap->num) {
2568                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2569                 talloc_free(tmp_ctx);
2570                 return;
2571         }
2572
2573         /* send the flags update to all connected nodes */
2574         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2575
2576         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2577                                       nodes, 0, CONTROL_TIMEOUT(),
2578                                       false, data,
2579                                       NULL, NULL,
2580                                       NULL) != 0) {
2581                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2582
2583                 talloc_free(tmp_ctx);
2584                 return;
2585         }
2586
2587         talloc_free(tmp_ctx);
2588 }
2589
2590
2591 struct verify_recmode_normal_data {
2592         uint32_t count;
2593         enum monitor_result status;
2594 };
2595
2596 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2597 {
2598         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2599
2600
2601         /* one more node has responded with recmode data*/
2602         rmdata->count--;
2603
2604         /* if we failed to get the recmode, then return an error and let
2605            the main loop try again.
2606         */
2607         if (state->state != CTDB_CONTROL_DONE) {
2608                 if (rmdata->status == MONITOR_OK) {
2609                         rmdata->status = MONITOR_FAILED;
2610                 }
2611                 return;
2612         }
2613
2614         /* if we got a response, then the recmode will be stored in the
2615            status field
2616         */
2617         if (state->status != CTDB_RECOVERY_NORMAL) {
2618                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2619                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2620         }
2621
2622         return;
2623 }
2624
2625
2626 /* verify that all nodes are in normal recovery mode */
2627 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2628 {
2629         struct verify_recmode_normal_data *rmdata;
2630         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2631         struct ctdb_client_control_state *state;
2632         enum monitor_result status;
2633         int j;
2634         
2635         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2636         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2637         rmdata->count  = 0;
2638         rmdata->status = MONITOR_OK;
2639
2640         /* loop over all active nodes and send an async getrecmode call to 
2641            them*/
2642         for (j=0; j<nodemap->num; j++) {
2643                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2644                         continue;
2645                 }
2646                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2647                                         CONTROL_TIMEOUT(), 
2648                                         nodemap->nodes[j].pnn);
2649                 if (state == NULL) {
2650                         /* we failed to send the control, treat this as 
2651                            an error and try again next iteration
2652                         */                      
2653                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2654                         talloc_free(mem_ctx);
2655                         return MONITOR_FAILED;
2656                 }
2657
2658                 /* set up the callback functions */
2659                 state->async.fn = verify_recmode_normal_callback;
2660                 state->async.private_data = rmdata;
2661
2662                 /* one more control to wait for to complete */
2663                 rmdata->count++;
2664         }
2665
2666
2667         /* now wait for up to the maximum number of seconds allowed
2668            or until all nodes we expect a response from has replied
2669         */
2670         while (rmdata->count > 0) {
2671                 event_loop_once(ctdb->ev);
2672         }
2673
2674         status = rmdata->status;
2675         talloc_free(mem_ctx);
2676         return status;
2677 }
2678
2679
2680 struct verify_recmaster_data {
2681         struct ctdb_recoverd *rec;
2682         uint32_t count;
2683         uint32_t pnn;
2684         enum monitor_result status;
2685 };
2686
2687 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2688 {
2689         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2690
2691
2692         /* one more node has responded with recmaster data*/
2693         rmdata->count--;
2694
2695         /* if we failed to get the recmaster, then return an error and let
2696            the main loop try again.
2697         */
2698         if (state->state != CTDB_CONTROL_DONE) {
2699                 if (rmdata->status == MONITOR_OK) {
2700                         rmdata->status = MONITOR_FAILED;
2701                 }
2702                 return;
2703         }
2704
2705         /* if we got a response, then the recmaster will be stored in the
2706            status field
2707         */
2708         if (state->status != rmdata->pnn) {
2709                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2710                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2711                 rmdata->status = MONITOR_ELECTION_NEEDED;
2712         }
2713
2714         return;
2715 }
2716
2717
2718 /* verify that all nodes agree that we are the recmaster */
2719 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2720 {
2721         struct ctdb_context *ctdb = rec->ctdb;
2722         struct verify_recmaster_data *rmdata;
2723         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2724         struct ctdb_client_control_state *state;
2725         enum monitor_result status;
2726         int j;
2727         
2728         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2729         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2730         rmdata->rec    = rec;
2731         rmdata->count  = 0;
2732         rmdata->pnn    = pnn;
2733         rmdata->status = MONITOR_OK;
2734
2735         /* loop over all active nodes and send an async getrecmaster call to 
2736            them*/
2737         for (j=0; j<nodemap->num; j++) {
2738                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2739                         continue;
2740                 }
2741                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2742                                         CONTROL_TIMEOUT(),
2743                                         nodemap->nodes[j].pnn);
2744                 if (state == NULL) {
2745                         /* we failed to send the control, treat this as 
2746                            an error and try again next iteration
2747                         */                      
2748                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2749                         talloc_free(mem_ctx);
2750                         return MONITOR_FAILED;
2751                 }
2752
2753                 /* set up the callback functions */
2754                 state->async.fn = verify_recmaster_callback;
2755                 state->async.private_data = rmdata;
2756
2757                 /* one more control to wait for to complete */
2758                 rmdata->count++;
2759         }
2760
2761
2762         /* now wait for up to the maximum number of seconds allowed
2763            or until all nodes we expect a response from has replied
2764         */
2765         while (rmdata->count > 0) {
2766                 event_loop_once(ctdb->ev);
2767         }
2768
2769         status = rmdata->status;
2770         talloc_free(mem_ctx);
2771         return status;
2772 }
2773
2774
2775 /* called to check that the local allocation of public ip addresses is ok.
2776 */
2777 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2778 {
2779         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2780         struct ctdb_control_get_ifaces *ifaces = NULL;
2781         struct ctdb_uptime *uptime1 = NULL;
2782         struct ctdb_uptime *uptime2 = NULL;
2783         int ret, j;
2784         bool need_iface_check = false;
2785         bool need_takeover_run = false;
2786
2787         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2788                                 CTDB_CURRENT_NODE, &uptime1);
2789         if (ret != 0) {
2790                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2791                 talloc_free(mem_ctx);
2792                 return -1;
2793         }
2794
2795
2796         /* read the interfaces from the local node */
2797         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2798         if (ret != 0) {
2799                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2800                 talloc_free(mem_ctx);
2801                 return -1;
2802         }
2803
2804         if (!rec->ifaces) {
2805                 need_iface_check = true;
2806         } else if (rec->ifaces->num != ifaces->num) {
2807                 need_iface_check = true;
2808         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2809                 need_iface_check = true;
2810         }
2811
2812         talloc_free(rec->ifaces);
2813         rec->ifaces = talloc_steal(rec, ifaces);
2814
2815         if (need_iface_check) {
2816                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2817                                      "local node %u - force takeover run\n",
2818                                      pnn));
2819                 need_takeover_run = true;
2820         }
2821
2822         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2823                                 CTDB_CURRENT_NODE, &uptime2);
2824         if (ret != 0) {
2825                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2826                 talloc_free(mem_ctx);
2827                 return -1;
2828         }
2829
2830         /* skip the check if the startrecovery time has changed */
2831         if (timeval_compare(&uptime1->last_recovery_started,
2832                             &uptime2->last_recovery_started) != 0) {
2833                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2834                 talloc_free(mem_ctx);
2835                 return 0;
2836         }
2837
2838         /* skip the check if the endrecovery time has changed */
2839         if (timeval_compare(&uptime1->last_recovery_finished,
2840                             &uptime2->last_recovery_finished) != 0) {
2841                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2842                 talloc_free(mem_ctx);
2843                 return 0;
2844         }
2845
2846         /* skip the check if we have started but not finished recovery */
2847         if (timeval_compare(&uptime1->last_recovery_finished,
2848                             &uptime1->last_recovery_started) != 1) {
2849                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2850                 talloc_free(mem_ctx);
2851
2852                 return 0;
2853         }
2854
2855         /* verify that we have the ip addresses we should have
2856            and we dont have ones we shouldnt have.
2857            if we find an inconsistency we set recmode to
2858            active on the local node and wait for the recmaster
2859            to do a full blown recovery.
2860            also if the pnn is -1 and we are healthy and can host the ip
2861            we also request a ip reallocation.
2862         */
2863         if (ctdb->tunable.disable_ip_failover == 0) {
2864                 struct ctdb_all_public_ips *ips = NULL;
2865
2866                 /* read the *available* IPs from the local node */
2867                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
2868                 if (ret != 0) {
2869                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
2870                         talloc_free(mem_ctx);
2871                         return -1;
2872                 }
2873
2874                 for (j=0; j<ips->num; j++) {
2875                         if (ips->ips[j].pnn == -1 &&
2876                             nodemap->nodes[pnn].flags == 0) {
2877                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
2878                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
2879                                 need_takeover_run = true;
2880                         }
2881                 }
2882
2883                 talloc_free(ips);
2884
2885                 /* read the *known* IPs from the local node */
2886                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
2887                 if (ret != 0) {
2888                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
2889                         talloc_free(mem_ctx);
2890                         return -1;
2891                 }
2892
2893                 for (j=0; j<ips->num; j++) {
2894                         if (ips->ips[j].pnn == pnn) {
2895                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
2896                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
2897                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2898                                         need_takeover_run = true;
2899                                 }
2900                         } else {
2901                                 if (ctdb->do_checkpublicip &&
2902                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
2903
2904                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
2905                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2906
2907                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
2908                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
2909                                         }
2910                                 }
2911                         }
2912                 }
2913         }
2914
2915         if (need_takeover_run) {
2916                 struct takeover_run_reply rd;
2917                 TDB_DATA data;
2918
2919                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2920
2921                 rd.pnn = ctdb->pnn;
2922                 rd.srvid = 0;
2923                 data.dptr = (uint8_t *)&rd;
2924                 data.dsize = sizeof(rd);
2925
2926                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2927                 if (ret != 0) {
2928                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2929                 }
2930         }
2931         talloc_free(mem_ctx);
2932         return 0;
2933 }
2934
2935
2936 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2937 {
2938         struct ctdb_node_map **remote_nodemaps = callback_data;
2939
2940         if (node_pnn >= ctdb->num_nodes) {
2941                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2942                 return;
2943         }
2944
2945         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2946
2947 }
2948
2949 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2950         struct ctdb_node_map *nodemap,
2951         struct ctdb_node_map **remote_nodemaps)
2952 {
2953         uint32_t *nodes;
2954
2955         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2956         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2957                                         nodes, 0,
2958                                         CONTROL_TIMEOUT(), false, tdb_null,
2959                                         async_getnodemap_callback,
2960                                         NULL,
2961                                         remote_nodemaps) != 0) {
2962                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2963
2964                 return -1;
2965         }
2966
2967         return 0;
2968 }
2969
2970 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2971 struct ctdb_check_reclock_state {
2972         struct ctdb_context *ctdb;
2973         struct timeval start_time;
2974         int fd[2];
2975         pid_t child;
2976         struct timed_event *te;
2977         struct fd_event *fde;
2978         enum reclock_child_status status;
2979 };
2980
2981 /* when we free the reclock state we must kill any child process.
2982 */
2983 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2984 {
2985         struct ctdb_context *ctdb = state->ctdb;
2986
2987         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2988
2989         if (state->fd[0] != -1) {
2990                 close(state->fd[0]);
2991                 state->fd[0] = -1;
2992         }
2993         if (state->fd[1] != -1) {
2994                 close(state->fd[1]);
2995                 state->fd[1] = -1;
2996         }
2997         ctdb_kill(ctdb, state->child, SIGKILL);
2998         return 0;
2999 }
3000
3001 /*
3002   called if our check_reclock child times out. this would happen if
3003   i/o to the reclock file blocks.
3004  */
3005 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3006                                          struct timeval t, void *private_data)
3007 {
3008         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3009                                            struct ctdb_check_reclock_state);
3010
3011         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3012         state->status = RECLOCK_TIMEOUT;
3013 }
3014
3015 /* this is called when the child process has completed checking the reclock
3016    file and has written data back to us through the pipe.
3017 */
3018 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3019                              uint16_t flags, void *private_data)
3020 {
3021         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3022                                              struct ctdb_check_reclock_state);
3023         char c = 0;
3024         int ret;
3025
3026         /* we got a response from our child process so we can abort the
3027            timeout.
3028         */
3029         talloc_free(state->te);
3030         state->te = NULL;
3031
3032         ret = read(state->fd[0], &c, 1);
3033         if (ret != 1 || c != RECLOCK_OK) {
3034                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3035                 state->status = RECLOCK_FAILED;
3036
3037                 return;
3038         }
3039
3040         state->status = RECLOCK_OK;
3041         return;
3042 }
3043
3044 static int check_recovery_lock(struct ctdb_context *ctdb)
3045 {
3046         int ret;
3047         struct ctdb_check_reclock_state *state;
3048         pid_t parent = getpid();
3049
3050         if (ctdb->recovery_lock_fd == -1) {
3051                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3052                 return -1;
3053         }
3054
3055         state = talloc(ctdb, struct ctdb_check_reclock_state);
3056         CTDB_NO_MEMORY(ctdb, state);
3057
3058         state->ctdb = ctdb;
3059         state->start_time = timeval_current();
3060         state->status = RECLOCK_CHECKING;
3061         state->fd[0] = -1;
3062         state->fd[1] = -1;
3063
3064         ret = pipe(state->fd);
3065         if (ret != 0) {
3066                 talloc_free(state);
3067                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3068                 return -1;
3069         }
3070
3071         state->child = ctdb_fork(ctdb);
3072         if (state->child == (pid_t)-1) {
3073                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3074                 close(state->fd[0]);
3075                 state->fd[0] = -1;
3076                 close(state->fd[1]);
3077                 state->fd[1] = -1;
3078                 talloc_free(state);
3079                 return -1;
3080         }
3081
3082         if (state->child == 0) {
3083                 char cc = RECLOCK_OK;
3084                 close(state->fd[0]);
3085                 state->fd[0] = -1;
3086
3087                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3088                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3089                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3090                         cc = RECLOCK_FAILED;
3091                 }
3092
3093                 write(state->fd[1], &cc, 1);
3094                 /* make sure we die when our parent dies */
3095                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3096                         sleep(5);
3097                         write(state->fd[1], &cc, 1);
3098                 }
3099                 _exit(0);
3100         }
3101         close(state->fd[1]);
3102         state->fd[1] = -1;
3103         set_close_on_exec(state->fd[0]);
3104
3105         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3106
3107         talloc_set_destructor(state, check_reclock_destructor);
3108
3109         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3110                                     ctdb_check_reclock_timeout, state);
3111         if (state->te == NULL) {
3112                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3113                 talloc_free(state);
3114                 return -1;
3115         }
3116
3117         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3118                                 EVENT_FD_READ,
3119                                 reclock_child_handler,
3120                                 (void *)state);
3121
3122         if (state->fde == NULL) {
3123                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3124                 talloc_free(state);
3125                 return -1;
3126         }
3127         tevent_fd_set_auto_close(state->fde);
3128
3129         while (state->status == RECLOCK_CHECKING) {
3130                 event_loop_once(ctdb->ev);
3131         }
3132
3133         if (state->status == RECLOCK_FAILED) {
3134                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3135                 close(ctdb->recovery_lock_fd);
3136                 ctdb->recovery_lock_fd = -1;
3137                 talloc_free(state);
3138                 return -1;
3139         }
3140
3141         talloc_free(state);
3142         return 0;
3143 }
3144
3145 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3146 {
3147         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3148         const char *reclockfile;
3149
3150         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3151                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3152                 talloc_free(tmp_ctx);
3153                 return -1;      
3154         }
3155
3156         if (reclockfile == NULL) {
3157                 if (ctdb->recovery_lock_file != NULL) {
3158                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3159                         talloc_free(ctdb->recovery_lock_file);
3160                         ctdb->recovery_lock_file = NULL;
3161                         if (ctdb->recovery_lock_fd != -1) {
3162                                 close(ctdb->recovery_lock_fd);
3163                                 ctdb->recovery_lock_fd = -1;
3164                         }
3165                 }
3166                 ctdb->tunable.verify_recovery_lock = 0;
3167                 talloc_free(tmp_ctx);
3168                 return 0;
3169         }
3170
3171         if (ctdb->recovery_lock_file == NULL) {
3172                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3173                 if (ctdb->recovery_lock_fd != -1) {
3174                         close(ctdb->recovery_lock_fd);
3175                         ctdb->recovery_lock_fd = -1;
3176                 }
3177                 talloc_free(tmp_ctx);
3178                 return 0;
3179         }
3180
3181
3182         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3183                 talloc_free(tmp_ctx);
3184                 return 0;
3185         }
3186
3187         talloc_free(ctdb->recovery_lock_file);
3188         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3189         ctdb->tunable.verify_recovery_lock = 0;
3190         if (ctdb->recovery_lock_fd != -1) {
3191                 close(ctdb->recovery_lock_fd);
3192                 ctdb->recovery_lock_fd = -1;
3193         }
3194
3195         talloc_free(tmp_ctx);
3196         return 0;
3197 }
3198
3199 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3200                       TALLOC_CTX *mem_ctx)
3201 {
3202         uint32_t pnn;
3203         struct ctdb_node_map *nodemap=NULL;
3204         struct ctdb_node_map *recmaster_nodemap=NULL;
3205         struct ctdb_node_map **remote_nodemaps=NULL;
3206         struct ctdb_vnn_map *vnnmap=NULL;
3207         struct ctdb_vnn_map *remote_vnnmap=NULL;
3208         int32_t debug_level;
3209         int i, j, ret;
3210
3211
3212
3213         /* verify that the main daemon is still running */
3214         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3215                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3216                 exit(-1);
3217         }
3218
3219         /* ping the local daemon to tell it we are alive */
3220         ctdb_ctrl_recd_ping(ctdb);
3221
3222         if (rec->election_timeout) {
3223                 /* an election is in progress */
3224                 return;
3225         }
3226
3227         /* read the debug level from the parent and update locally */
3228         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3229         if (ret !=0) {
3230                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3231                 return;
3232         }
3233         LogLevel = debug_level;
3234
3235
3236         /* We must check if we need to ban a node here but we want to do this
3237            as early as possible so we dont wait until we have pulled the node
3238            map from the local node. thats why we have the hardcoded value 20
3239         */
3240         for (i=0; i<ctdb->num_nodes; i++) {
3241                 struct ctdb_banning_state *ban_state;
3242
3243                 if (ctdb->nodes[i]->ban_state == NULL) {
3244                         continue;
3245                 }
3246                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3247                 if (ban_state->count < 20) {
3248                         continue;
3249                 }
3250                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3251                         ctdb->nodes[i]->pnn, ban_state->count,
3252                         ctdb->tunable.recovery_ban_period));
3253                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3254                 ban_state->count = 0;
3255         }
3256
3257         /* get relevant tunables */
3258         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3259         if (ret != 0) {
3260                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3261                 return;
3262         }
3263
3264         /* get the current recovery lock file from the server */
3265         if (update_recovery_lock_file(ctdb) != 0) {
3266                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3267                 return;
3268         }
3269
3270         /* Make sure that if recovery lock verification becomes disabled when
3271            we close the file
3272         */
3273         if (ctdb->tunable.verify_recovery_lock == 0) {
3274                 if (ctdb->recovery_lock_fd != -1) {
3275                         close(ctdb->recovery_lock_fd);
3276                         ctdb->recovery_lock_fd = -1;
3277                 }
3278         }
3279
3280         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3281         if (pnn == (uint32_t)-1) {
3282                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3283                 return;
3284         }
3285
3286         /* get the vnnmap */
3287         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3288         if (ret != 0) {
3289                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3290                 return;
3291         }
3292
3293
3294         /* get number of nodes */
3295         if (rec->nodemap) {
3296                 talloc_free(rec->nodemap);
3297                 rec->nodemap = NULL;
3298                 nodemap=NULL;
3299         }
3300         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3301         if (ret != 0) {
3302                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3303                 return;
3304         }
3305         nodemap = rec->nodemap;
3306
3307         /* update the capabilities for all nodes */
3308         ret = update_capabilities(ctdb, nodemap);
3309         if (ret != 0) {
3310                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3311                 return;
3312         }
3313
3314         /* check which node is the recovery master */
3315         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3316         if (ret != 0) {
3317                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3318                 return;
3319         }
3320
3321         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3322         if (rec->recmaster != pnn) {
3323                 if (rec->ip_reallocate_ctx != NULL) {
3324                         talloc_free(rec->ip_reallocate_ctx);
3325                         rec->ip_reallocate_ctx = NULL;
3326                         rec->reallocate_callers = NULL;
3327                 }
3328         }
3329
3330         if (rec->recmaster == (uint32_t)-1) {
3331                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3332                 force_election(rec, pnn, nodemap);
3333                 return;
3334         }
3335
3336         /* if the local daemon is STOPPED, we verify that the databases are
3337            also frozen and thet the recmode is set to active 
3338         */
3339         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3340                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3341                 if (ret != 0) {
3342                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3343                 }
3344                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3345                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3346
3347                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3348                         if (ret != 0) {
3349                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3350                                 return;
3351                         }
3352                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3353                         if (ret != 0) {
3354                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3355
3356                                 return;
3357                         }
3358                         return;
3359                 }
3360         }
3361         /* If the local node is stopped, verify we are not the recmaster 
3362            and yield this role if so
3363         */
3364         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
3365                 DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
3366                 force_election(rec, pnn, nodemap);
3367                 return;
3368         }
3369         
3370         /*
3371          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3372          * but we have force an election and try to become the new
3373          * recmaster
3374          */
3375         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3376             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3377              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3378                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3379                                   " but we (node %u) have - force an election\n",
3380                                   rec->recmaster, pnn));
3381                 force_election(rec, pnn, nodemap);
3382                 return;
3383         }
3384
3385         /* check that we (recovery daemon) and the local ctdb daemon
3386            agrees on whether we are banned or not
3387         */
3388 //qqq
3389
3390         /* remember our own node flags */
3391         rec->node_flags = nodemap->nodes[pnn].flags;
3392
3393         /* count how many active nodes there are */
3394         rec->num_active    = 0;
3395         rec->num_connected = 0;
3396         for (i=0; i<nodemap->num; i++) {
3397                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3398                         rec->num_active++;
3399                 }
3400                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3401                         rec->num_connected++;
3402                 }
3403         }
3404
3405
3406         /* verify that the recmaster node is still active */
3407         for (j=0; j<nodemap->num; j++) {
3408                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3409                         break;
3410                 }
3411         }
3412
3413         if (j == nodemap->num) {
3414                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3415                 force_election(rec, pnn, nodemap);
3416                 return;
3417         }
3418
3419         /* if recovery master is disconnected we must elect a new recmaster */
3420         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3421                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3422                 force_election(rec, pnn, nodemap);
3423                 return;
3424         }
3425
3426         /* get nodemap from the recovery master to check if it is inactive */
3427         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3428                                    mem_ctx, &recmaster_nodemap);
3429         if (ret != 0) {
3430                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3431                           nodemap->nodes[j].pnn));
3432                 return;
3433         }
3434
3435
3436         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3437             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3438                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3439                 force_election(rec, pnn, nodemap);
3440                 return;
3441         }
3442
3443         /* If this node is stopped then it is not the recovery master
3444          * so the only remaining action is to potentially to verify
3445          * the local IP allocation below.  This won't accomplish
3446          * anything useful so skip it.
3447          */
3448         if (rec->node_flags & NODE_FLAGS_STOPPED) {
3449                 return;
3450         }
3451
3452         /* verify that we have all ip addresses we should have and we dont
3453          * have addresses we shouldnt have.
3454          */ 
3455         if (ctdb->tunable.disable_ip_failover == 0) {
3456                 if (rec->ip_check_disable_ctx == NULL) {
3457                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3458                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3459                         }
3460                 }
3461         }
3462
3463
3464         /* if we are not the recmaster then we do not need to check
3465            if recovery is needed
3466          */
3467         if (pnn != rec->recmaster) {
3468                 return;
3469         }
3470
3471
3472         /* ensure our local copies of flags are right */
3473         ret = update_local_flags(rec, nodemap);
3474         if (ret == MONITOR_ELECTION_NEEDED) {
3475                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3476                 force_election(rec, pnn, nodemap);
3477                 return;
3478         }
3479         if (ret != MONITOR_OK) {
3480                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3481                 return;
3482         }
3483
3484         if (ctdb->num_nodes != nodemap->num) {
3485                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3486                 reload_nodes_file(ctdb);
3487                 return;
3488         }
3489
3490         /* verify that all active nodes agree that we are the recmaster */
3491         switch (verify_recmaster(rec, nodemap, pnn)) {
3492         case MONITOR_RECOVERY_NEEDED:
3493                 /* can not happen */
3494                 return;
3495         case MONITOR_ELECTION_NEEDED:
3496                 force_election(rec, pnn, nodemap);
3497                 return;
3498         case MONITOR_OK:
3499                 break;
3500         case MONITOR_FAILED:
3501                 return;
3502         }
3503
3504
3505         if (rec->need_recovery) {
3506                 /* a previous recovery didn't finish */
3507                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3508                 return;
3509         }
3510
3511         /* verify that all active nodes are in normal mode 
3512            and not in recovery mode 
3513         */
3514         switch (verify_recmode(ctdb, nodemap)) {
3515         case MONITOR_RECOVERY_NEEDED:
3516                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3517                 return;
3518         case MONITOR_FAILED:
3519                 return;
3520         case MONITOR_ELECTION_NEEDED:
3521                 /* can not happen */
3522         case MONITOR_OK:
3523                 break;
3524         }
3525
3526
3527         if (ctdb->tunable.verify_recovery_lock != 0) {
3528                 /* we should have the reclock - check its not stale */
3529                 ret = check_recovery_lock(ctdb);
3530                 if (ret != 0) {
3531                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3532                         ctdb_set_culprit(rec, ctdb->pnn);
3533                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3534                         return;
3535                 }
3536         }
3537
3538
3539         /* is there a pending reload all ips ? */
3540         if (reload_all_ips_request != NULL) {
3541                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3542                 talloc_free(reload_all_ips_request);
3543                 reload_all_ips_request = NULL;
3544         }
3545
3546         /* if there are takeovers requested, perform it and notify the waiters */
3547         if (rec->reallocate_callers) {
3548                 process_ipreallocate_requests(ctdb, rec);
3549         }
3550
3551         /* get the nodemap for all active remote nodes
3552          */
3553         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3554         if (remote_nodemaps == NULL) {
3555                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3556                 return;
3557         }
3558         for(i=0; i<nodemap->num; i++) {
3559                 remote_nodemaps[i] = NULL;
3560         }
3561         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3562                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3563                 return;
3564         } 
3565
3566         /* verify that all other nodes have the same nodemap as we have
3567         */
3568         for (j=0; j<nodemap->num; j++) {
3569                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3570                         continue;
3571                 }
3572
3573                 if (remote_nodemaps[j] == NULL) {
3574                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3575                         ctdb_set_culprit(rec, j);
3576
3577                         return;
3578                 }
3579
3580                 /* if the nodes disagree on how many nodes there are
3581                    then this is a good reason to try recovery
3582                  */
3583                 if (remote_nodemaps[j]->num != nodemap->num) {
3584                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3585                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3586                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3587                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3588                         return;
3589                 }
3590
3591                 /* if the nodes disagree on which nodes exist and are
3592                    active, then that is also a good reason to do recovery
3593                  */
3594                 for (i=0;i<nodemap->num;i++) {
3595                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3596                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3597                                           nodemap->nodes[j].pnn, i, 
3598                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3599                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3600                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3601                                             vnnmap);
3602                                 return;
3603                         }
3604                 }
3605
3606                 /* verify the flags are consistent
3607                 */
3608                 for (i=0; i<nodemap->num; i++) {
3609                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3610                                 continue;
3611                         }
3612                         
3613                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3614                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3615                                   nodemap->nodes[j].pnn, 
3616                                   nodemap->nodes[i].pnn, 
3617                                   remote_nodemaps[j]->nodes[i].flags,
3618                                   nodemap->nodes[j].flags));
3619                                 if (i == j) {
3620                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3621                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3622                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3623                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3624                                                     vnnmap);
3625                                         return;
3626                                 } else {
3627                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3628                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3629                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3630                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3631                                                     vnnmap);
3632                                         return;
3633                                 }
3634                         }
3635                 }
3636         }
3637
3638
3639         /* there better be the same number of lmasters in the vnn map
3640            as there are active nodes or we will have to do a recovery
3641          */
3642         if (vnnmap->size != rec->num_active) {
3643                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3644                           vnnmap->size, rec->num_active));
3645                 ctdb_set_culprit(rec, ctdb->pnn);
3646                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3647                 return;
3648         }
3649
3650         /* verify that all active nodes in the nodemap also exist in 
3651            the vnnmap.
3652          */
3653         for (j=0; j<nodemap->num; j++) {
3654                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3655                         continue;
3656                 }
3657                 if (nodemap->nodes[j].pnn == pnn) {
3658                         continue;
3659                 }
3660
3661                 for (i=0; i<vnnmap->size; i++) {
3662                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3663                                 break;
3664                         }
3665                 }
3666                 if (i == vnnmap->size) {
3667                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3668                                   nodemap->nodes[j].pnn));
3669                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3670                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3671                         return;
3672                 }
3673         }
3674
3675         
3676         /* verify that all other nodes have the same vnnmap
3677            and are from the same generation
3678          */
3679         for (j=0; j<nodemap->num; j++) {
3680                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3681                         continue;
3682                 }
3683                 if (nodemap->nodes[j].pnn == pnn) {
3684                         continue;
3685                 }
3686
3687                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3688                                           mem_ctx, &remote_vnnmap);
3689                 if (ret != 0) {
3690                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3691                                   nodemap->nodes[j].pnn));
3692                         return;
3693                 }
3694
3695                 /* verify the vnnmap generation is the same */
3696                 if (vnnmap->generation != remote_vnnmap->generation) {
3697                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3698                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3699                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3700                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3701                         return;
3702                 }
3703
3704                 /* verify the vnnmap size is the same */
3705                 if (vnnmap->size != remote_vnnmap->size) {
3706                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3707                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3708                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3709                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3710                         return;
3711                 }
3712
3713                 /* verify the vnnmap is the same */
3714                 for (i=0;i<vnnmap->size;i++) {
3715                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3716                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3717                                           nodemap->nodes[j].pnn));
3718                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3719                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3720                                             vnnmap);
3721                                 return;
3722                         }
3723                 }
3724         }
3725
3726         /* we might need to change who has what IP assigned */
3727         if (rec->need_takeover_run) {
3728                 uint32_t culprit = (uint32_t)-1;
3729
3730                 rec->need_takeover_run = false;
3731
3732                 /* update the list of public ips that a node can handle for
3733                    all connected nodes
3734                 */
3735                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3736                 if (ret != 0) {
3737                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3738                                          culprit));
3739                         rec->need_takeover_run = true;
3740                         return;
3741                 }
3742
3743                 /* execute the "startrecovery" event script on all nodes */
3744                 ret = run_startrecovery_eventscript(rec, nodemap);
3745                 if (ret!=0) {
3746                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3747                         ctdb_set_culprit(rec, ctdb->pnn);
3748                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3749                         return;
3750                 }
3751
3752                 ret = ctdb_takeover_run(ctdb, nodemap);
3753                 if (ret != 0) {
3754                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3755                         return;
3756                 }
3757
3758                 /* execute the "recovered" event script on all nodes */
3759                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3760 #if 0
3761 // we cant check whether the event completed successfully
3762 // since this script WILL fail if the node is in recovery mode
3763 // and if that race happens, the code here would just cause a second
3764 // cascading recovery.
3765                 if (ret!=0) {
3766                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3767                         ctdb_set_culprit(rec, ctdb->pnn);
3768                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3769                 }
3770 #endif
3771         }
3772 }
3773
3774 /*
3775   the main monitoring loop
3776  */
3777 static void monitor_cluster(struct ctdb_context *ctdb)
3778 {
3779         struct ctdb_recoverd *rec;
3780
3781         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3782
3783         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3784         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3785
3786         rec->ctdb = ctdb;
3787
3788         rec->priority_time = timeval_current();
3789
3790         /* register a message port for sending memory dumps */
3791         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3792
3793         /* register a message port for recovery elections */
3794         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3795
3796         /* when nodes are disabled/enabled */
3797         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3798
3799         /* when we are asked to puch out a flag change */
3800         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3801
3802         /* register a message port for vacuum fetch */
3803         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3804
3805         /* register a message port for reloadnodes  */
3806         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3807
3808         /* register a message port for performing a takeover run */
3809         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3810
3811         /* register a message port for performing a reload all ips */
3812         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3813
3814         /* register a message port for disabling the ip check for a short while */
3815         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3816
3817         /* register a message port for updating the recovery daemons node assignment for an ip */
3818         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3819
3820         /* register a message port for forcing a rebalance of a node next
3821            reallocation */
3822         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3823
3824         for (;;) {
3825                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3826                 struct timeval start;
3827                 double elapsed;
3828
3829                 if (!mem_ctx) {
3830                         DEBUG(DEBUG_CRIT,(__location__
3831                                           " Failed to create temp context\n"));
3832                         exit(-1);
3833                 }
3834
3835                 start = timeval_current();
3836                 main_loop(ctdb, rec, mem_ctx);
3837                 talloc_free(mem_ctx);
3838
3839                 /* we only check for recovery once every second */
3840                 elapsed = timeval_elapsed(&start);
3841                 if (elapsed < ctdb->tunable.recover_interval) {
3842                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3843                                           - elapsed);
3844                 }
3845         }
3846 }
3847
3848 /*
3849   event handler for when the main ctdbd dies
3850  */
3851 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3852                                  uint16_t flags, void *private_data)
3853 {
3854         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3855         _exit(1);
3856 }
3857
3858 /*
3859   called regularly to verify that the recovery daemon is still running
3860  */
3861 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3862                               struct timeval yt, void *p)
3863 {
3864         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3865
3866         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3867                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3868
3869                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3870                                 ctdb_restart_recd, ctdb);
3871
3872                 return;
3873         }
3874
3875         event_add_timed(ctdb->ev, ctdb, 
3876                         timeval_current_ofs(30, 0),
3877                         ctdb_check_recd, ctdb);
3878 }
3879
3880 static void recd_sig_child_handler(struct event_context *ev,
3881         struct signal_event *se, int signum, int count,
3882         void *dont_care, 
3883         void *private_data)
3884 {
3885 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3886         int status;
3887         pid_t pid = -1;
3888
3889         while (pid != 0) {
3890                 pid = waitpid(-1, &status, WNOHANG);
3891                 if (pid == -1) {
3892                         if (errno != ECHILD) {
3893                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3894                         }
3895                         return;
3896                 }
3897                 if (pid > 0) {
3898                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3899                 }
3900         }
3901 }
3902
3903 /*
3904   startup the recovery daemon as a child of the main ctdb daemon
3905  */
3906 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3907 {
3908         int fd[2];
3909         struct signal_event *se;
3910         struct tevent_fd *fde;
3911
3912         if (pipe(fd) != 0) {
3913                 return -1;
3914         }
3915
3916         ctdb->ctdbd_pid = getpid();
3917
3918         ctdb->recoverd_pid = ctdb_fork(ctdb);
3919         if (ctdb->recoverd_pid == -1) {
3920                 return -1;
3921         }
3922         
3923         if (ctdb->recoverd_pid != 0) {
3924                 close(fd[0]);
3925                 event_add_timed(ctdb->ev, ctdb, 
3926                                 timeval_current_ofs(30, 0),
3927                                 ctdb_check_recd, ctdb);
3928                 return 0;
3929         }
3930
3931         close(fd[1]);
3932
3933         srandom(getpid() ^ time(NULL));
3934
3935         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3936                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3937                 exit(1);
3938         }
3939
3940         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3941
3942         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3943                      ctdb_recoverd_parent, &fd[0]);     
3944         tevent_fd_set_auto_close(fde);
3945
3946         /* set up a handler to pick up sigchld */
3947         se = event_add_signal(ctdb->ev, ctdb,
3948                                      SIGCHLD, 0,
3949                                      recd_sig_child_handler,
3950                                      ctdb);
3951         if (se == NULL) {
3952                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3953                 exit(1);
3954         }
3955
3956         monitor_cluster(ctdb);
3957
3958         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3959         return -1;
3960 }
3961
3962 /*
3963   shutdown the recovery daemon
3964  */
3965 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3966 {
3967         if (ctdb->recoverd_pid == 0) {
3968                 return;
3969         }
3970
3971         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3972         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3973 }
3974
3975 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3976                        struct timeval t, void *private_data)
3977 {
3978         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3979
3980         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3981         ctdb_stop_recoverd(ctdb);
3982         ctdb_start_recoverd(ctdb);
3983 }